Source code for pybel.struct.filters.edge_predicate_builders

# -*- coding: utf-8 -*-

"""Functions for predicates for edge data from BEL graphs."""

from typing import Iterable, Mapping

from .edge_filters import invert_edge_predicate
from .edge_predicates import (
    edge_predicate,
    has_authors,
    has_pubmed,
    true_edge_predicate,
)
from .typing import EdgePredicate
from ..graph import BELGraph
from ...constants import (
    ANNOTATIONS,
    CAUSAL_RELATIONS,
    CITATION,
    CITATION_AUTHORS,
    IDENTIFIER,
    RELATION,
)
from ...dsl import BaseEntity
from ...typing import EdgeData, Strings

__all__ = [
    "build_pmid_exclusion_filter",
    "build_annotation_dict_all_filter",
    "build_annotation_dict_any_filter",
    "build_upstream_edge_predicate",
    "build_downstream_edge_predicate",
    "build_relation_predicate",
    "build_pmid_inclusion_filter",
    "build_pmid_exclusion_filter",
    "build_author_inclusion_filter",
]


def _annotation_dict_all_filter(edge_data: EdgeData, query: Mapping[str, Iterable[str]]) -> bool:
    """Match edges with the given dictionary as a sub-dictionary.

    :param query: The annotation query dict to match
    """
    annotations = edge_data.get(ANNOTATIONS)

    if annotations is None:
        return False

    for key, values in query.items():
        ak = annotations.get(key)

        if ak is None:
            return False

        for value in values:
            if value not in ak:
                return False

    return True


[docs]def build_annotation_dict_all_filter(annotations: Mapping[str, Iterable[str]]) -> EdgePredicate: """Build an edge predicate for edges whose annotations are super-dictionaries of the given dictionary. If no annotations are given, will always evaluate to true. :param annotations: The annotation query dict to match """ if not annotations: return true_edge_predicate @edge_predicate def annotation_dict_all_filter(edge_data: EdgeData) -> bool: """Check if the all of the annotations in the enclosed query match.""" return _annotation_dict_all_filter(edge_data, query=annotations) return annotation_dict_all_filter
def _annotation_dict_any_filter(edge_data: EdgeData, query: Mapping[str, Iterable[str]]) -> bool: """Match edges with the given dictionary as a sub-dictionary. :param query: The annotation query dict to match """ annotations = edge_data.get(ANNOTATIONS) if annotations is None: return False return any(key in annotations and value in annotations[key] for key, values in query.items() for value in values)
[docs]def build_annotation_dict_any_filter(annotations: Mapping[str, Iterable[str]]) -> EdgePredicate: """Build an edge predicate that passes for edges whose data dictionaries match the given dictionary. If the given dictionary is empty, will always evaluate to true. :param annotations: The annotation query dict to match """ if not annotations: return true_edge_predicate @edge_predicate def annotation_dict_any_filter(edge_data: EdgeData) -> bool: """Check if the any of the annotations in the enclosed query match.""" return _annotation_dict_any_filter(edge_data, query=annotations) return annotation_dict_any_filter
[docs]def build_upstream_edge_predicate(nodes: Iterable[BaseEntity]) -> EdgePredicate: """Build an edge predicate that pass for relations for which one of the given nodes is the object.""" nodes = set(nodes) def upstream_filter(graph: BELGraph, u: BaseEntity, v: BaseEntity, k: str) -> bool: """Pass for relations for which one of the given nodes is the object.""" return v in nodes and graph[u][v][k][RELATION] in CAUSAL_RELATIONS return upstream_filter
[docs]def build_downstream_edge_predicate(nodes: Iterable[BaseEntity]) -> EdgePredicate: """Build an edge predicate that passes for edges for which one of the given nodes is the subject.""" nodes = set(nodes) def downstream_filter(graph: BELGraph, u: BaseEntity, v: BaseEntity, k: str) -> bool: """Pass for relations for which one of the given nodes is the subject.""" return u in nodes and graph[u][v][k][RELATION] in CAUSAL_RELATIONS return downstream_filter
[docs]def build_relation_predicate(relations: Strings) -> EdgePredicate: """Build an edge predicate that passes for edges with the given relation.""" if isinstance(relations, str): @edge_predicate def relation_predicate(edge_data: EdgeData) -> bool: """Pass for relations matching the enclosed value.""" return edge_data[RELATION] == relations elif isinstance(relations, Iterable): relation_set = set(relations) @edge_predicate def relation_predicate(edge_data: EdgeData) -> bool: """Pass for relations matching the enclosed values.""" return edge_data[RELATION] in relation_set else: raise TypeError return relation_predicate
[docs]def build_pmid_inclusion_filter(pmids: Strings) -> EdgePredicate: """Build an edge predicate that passes for edges with citations from the given PubMed identifier(s). :param pmids: A PubMed identifier or list of PubMed identifiers to filter for """ if isinstance(pmids, str): @edge_predicate def pmid_inclusion_filter(edge_data: EdgeData) -> bool: """Pass for edges with PubMed citations matching the contained PubMed identifier.""" return has_pubmed(edge_data) and edge_data[CITATION].identifier == pmids elif isinstance(pmids, Iterable): pmids = set(pmids) @edge_predicate def pmid_inclusion_filter(edge_data: EdgeData) -> bool: """Pass for edges with PubMed citations matching one of the contained PubMed identifiers.""" return has_pubmed(edge_data) and edge_data[CITATION].identifier in pmids else: raise TypeError return pmid_inclusion_filter
[docs]def build_pmid_exclusion_filter(pmids: Strings) -> EdgePredicate: """Fail for edges with citations whose references are one of the given PubMed identifiers. :param pmids: A PubMed identifier or list of PubMed identifiers to filter against """ return invert_edge_predicate(build_pmid_inclusion_filter(pmids))
[docs]def build_author_inclusion_filter(authors: Strings) -> EdgePredicate: """Build an edge predicate that passes for edges with citations written by the given author(s).""" if isinstance(authors, str): @edge_predicate def author_filter(edge_data: EdgeData) -> bool: """Pass for edges with citations with an author that matches the contained author.""" return has_authors(edge_data) and authors in edge_data[CITATION][CITATION_AUTHORS] elif isinstance(authors, Iterable): authors = set(authors) @edge_predicate def author_filter(edge_data: EdgeData) -> bool: """Pass for edges with citations with an author that matches one or more of the contained authors.""" return has_authors(edge_data) and any(author in edge_data[CITATION][CITATION_AUTHORS] for author in authors) else: raise TypeError return author_filter