Source code for pybel.struct.filters.edge_predicate_builders

# -*- coding: utf-8 -*-

"""Functions for predicates for edge data from BEL graphs."""

from typing import Iterable, Mapping

from .edge_predicates import edge_predicate, has_authors, has_pubmed, keep_edge_permissive
from .typing import EdgePredicate
from ..graph import BELGraph
from ...constants import ANNOTATIONS, CAUSAL_RELATIONS, CITATION, CITATION_AUTHORS, CITATION_IDENTIFIER, RELATION
from ...dsl import BaseEntity
from ...typing import EdgeData, Strings

__all__ = [
    'build_annotation_dict_all_filter',
    'build_annotation_dict_any_filter',
    'build_upstream_edge_predicate',
    'build_downstream_edge_predicate',
    'build_relation_predicate',
    'build_pmid_inclusion_filter',
    'build_author_inclusion_filter',
]


def _annotation_dict_all_filter(edge_data: EdgeData, query: Mapping[str, Iterable[str]]) -> bool:
    """Match edges with the given dictionary as a sub-dictionary.

    :param query: The annotation query dict to match
    """
    annotations = edge_data.get(ANNOTATIONS)

    if annotations is None:
        return False

    for key, values in query.items():
        ak = annotations.get(key)

        if ak is None:
            return False

        for value in values:
            if value not in ak:
                return False

    return True


[docs]def build_annotation_dict_all_filter(annotations: Mapping[str, Iterable[str]]) -> EdgePredicate: """Build an edge predicate for edges whose annotations are super-dictionaries of the given dictionary. If no annotations are given, will always evaluate to true. :param annotations: The annotation query dict to match """ if not annotations: return keep_edge_permissive @edge_predicate def annotation_dict_all_filter(edge_data: EdgeData) -> bool: """Check if the all of the annotations in the enclosed query match.""" return _annotation_dict_all_filter(edge_data, query=annotations) return annotation_dict_all_filter
def _annotation_dict_any_filter(edge_data: EdgeData, query: Mapping[str, Iterable[str]]) -> bool: """Match edges with the given dictionary as a sub-dictionary. :param query: The annotation query dict to match """ annotations = edge_data.get(ANNOTATIONS) if annotations is None: return False return any( key in annotations and value in annotations[key] for key, values in query.items() for value in values )
[docs]def build_annotation_dict_any_filter(annotations: Mapping[str, Iterable[str]]) -> EdgePredicate: """Build an edge predicate that passes for edges whose data dictionaries match the given dictionary. If the given dictionary is empty, will always evaluate to true. :param annotations: The annotation query dict to match """ if not annotations: return keep_edge_permissive @edge_predicate def annotation_dict_any_filter(edge_data: EdgeData) -> bool: """Check if the any of the annotations in the enclosed query match.""" return _annotation_dict_any_filter(edge_data, query=annotations) return annotation_dict_any_filter
[docs]def build_upstream_edge_predicate(nodes: Iterable[BaseEntity]) -> EdgePredicate: """Build an edge predicate that pass for relations for which one of the given nodes is the object.""" nodes = set(nodes) def upstream_filter(graph: BELGraph, u: BaseEntity, v: BaseEntity, k: str) -> bool: """Pass for relations for which one of the given nodes is the object.""" return v in nodes and graph[u][v][k][RELATION] in CAUSAL_RELATIONS return upstream_filter
[docs]def build_downstream_edge_predicate(nodes: Iterable[BaseEntity]) -> EdgePredicate: """Build an edge predicate that passes for edges for which one of the given nodes is the subject.""" nodes = set(nodes) def downstream_filter(graph: BELGraph, u: BaseEntity, v: BaseEntity, k: str) -> bool: """Pass for relations for which one of the given nodes is the subject.""" return u in nodes and graph[u][v][k][RELATION] in CAUSAL_RELATIONS return downstream_filter
[docs]def build_relation_predicate(relations: Strings) -> EdgePredicate: """Build an edge predicate that passes for edges with the given relation.""" if isinstance(relations, str): @edge_predicate def relation_predicate(edge_data: EdgeData) -> bool: """Pass for relations matching the enclosed value.""" return edge_data[RELATION] == relations elif isinstance(relations, Iterable): relation_set = set(relations) @edge_predicate def relation_predicate(edge_data: EdgeData) -> bool: """Pass for relations matching the enclosed values.""" return edge_data[RELATION] in relation_set else: raise TypeError return relation_predicate
[docs]def build_pmid_inclusion_filter(pmids: Strings) -> EdgePredicate: """Build an edge predicate that passes for edges with citations from the given PubMed identifier(s). :param pmids: A PubMed identifier or list of PubMed identifiers to filter for """ if isinstance(pmids, str): @edge_predicate def pmid_inclusion_filter(edge_data: EdgeData) -> bool: """Pass for edges with PubMed citations matching the contained PubMed identifier.""" return has_pubmed(edge_data) and edge_data[CITATION][CITATION_IDENTIFIER] == pmids elif isinstance(pmids, Iterable): pmids = set(pmids) @edge_predicate def pmid_inclusion_filter(edge_data: EdgeData) -> bool: """Pass for edges with PubMed citations matching one of the contained PubMed identifiers.""" return has_pubmed(edge_data) and edge_data[CITATION][CITATION_IDENTIFIER] in pmids else: raise TypeError return pmid_inclusion_filter
[docs]def build_author_inclusion_filter(authors: Strings) -> EdgePredicate: """Build an edge predicate that passes for edges with citations written by the given author(s).""" if isinstance(authors, str): @edge_predicate def author_filter(edge_data: EdgeData) -> bool: """Pass for edges with citations with an author that matches the contained author.""" return has_authors(edge_data) and authors in edge_data[CITATION][CITATION_AUTHORS] elif isinstance(authors, Iterable): authors = set(authors) @edge_predicate def author_filter(edge_data: EdgeData) -> bool: """Pass for edges with citations with an author that matches one or more of the contained authors.""" return has_authors(edge_data) and any( author in edge_data[CITATION][CITATION_AUTHORS] for author in authors ) else: raise TypeError return author_filter