Source code for pybel_tools.filters.edge_filters

# -*- coding: utf-8 -*-

"""
Edge Filters
------------

A edge filter is a function that takes five arguments: a :class:`pybel.BELGraph`, a source node tuple, a target node
tuple, a key, and a data dictionary. It returns a boolean representing whether the edge passed the given test.

This module contains a set of default functions for filtering lists of edges and building edge filtering functions.

A general use for an edge filter function is to use the built-in :func:`filter` in code like
:code:`filter(your_edge_filter, graph.edges_iter(keys=True, data=True))`
"""

from collections import Iterable, Mapping
from typing import Set

from pybel import BELGraph
from pybel.constants import *
from pybel.dsl import BaseEntity
from pybel.struct.filters import (
    build_annotation_dict_all_filter, build_annotation_dict_any_filter, count_passed_edge_filter,
)
from pybel.struct.filters.edge_predicates import edge_predicate, has_authors, has_pathology_causal, has_pubmed
from pybel.struct.filters.typing import EdgePredicate, EdgePredicates
from pybel.typing import Strings
from pybel.utils import subdict_matches

__all__ = [
    'summarize_edge_filter',
    'build_edge_data_filter',
    'build_annotation_dict_all_filter',
    'build_annotation_dict_any_filter',
    'build_pmid_inclusion_filter',
    'build_pmid_exclusion_filter',
    'build_author_inclusion_filter',
    'has_pathology_causal',
    'build_source_namespace_filter',
    'build_target_namespace_filter',
]


[docs]def summarize_edge_filter(graph: BELGraph, edge_predicates: EdgePredicates) -> None: """Print a summary of the number of edges passing a given set of filters.""" passed = count_passed_edge_filter(graph, edge_predicates) print('{}/{} edges passed {}'.format( passed, graph.number_of_edges(), ( ', '.join(edge_filter.__name__ for edge_filter in edge_predicates) if isinstance(edge_predicates, Iterable) else edge_predicates.__name__ ) ))
[docs]def build_edge_data_filter(annotations, partial_match: bool = True) -> EdgePredicate: """Build a filter that keeps edges whose data dictionaries are super-dictionaries to the given dictionary. :param dict annotations: The annotation query dict to match :param partial_match: Should the query values be used as partial or exact matches? Defaults to :code:`True`. """ @edge_predicate def annotation_dict_filter(data: Mapping) -> bool: """A filter that matches edges with the given dictionary as a sub-dictionary.""" return subdict_matches(data, annotations, partial_match=partial_match) return annotation_dict_filter
[docs]def build_pmid_inclusion_filter(pmid: Strings) -> EdgePredicate: """Pass for edges with citations whose references are one of the given PubMed identifiers. :param pmid: A PubMed identifier or list of PubMed identifiers to filter for """ if isinstance(pmid, str): @edge_predicate def pmid_inclusion_filter(data: Mapping) -> bool: """Only passes for edges with PubMed citations matching the contained PubMed identifier :return: If the edge has a PubMed citation with the contained PubMed identifier """ return has_pubmed(data) and data[CITATION][CITATION_REFERENCE] == pmid return pmid_inclusion_filter else: pmids = set(pmid) @edge_predicate def pmid_inclusion_filter(data: Mapping) -> bool: """Only passes for edges with PubMed citations matching one of the contained PubMed identifiers :param data: The edge data dictionary :return: If the edge has a PubMed citation with one of the contained PubMed identifiers """ return has_pubmed(data) and data[CITATION][CITATION_REFERENCE] in pmids return pmid_inclusion_filter
[docs]def build_pmid_exclusion_filter(pmid: Strings) -> EdgePredicate: """Fail for edges with citations whose references are one of the given PubMed identifiers. :param pmid: A PubMed identifier or list of PubMed identifiers to filter against """ if isinstance(pmid, str): @edge_predicate def pmid_exclusion_filter(data: Mapping) -> bool: """Fails for edges with PubMed citations matching the contained PubMed identifier :return: If the edge has a PubMed citation with the contained PubMed identifier """ return has_pubmed(data) and data[CITATION][CITATION_REFERENCE] != pmid return pmid_exclusion_filter else: pmids = set(pmid) @edge_predicate def pmid_exclusion_filter(data: Mapping) -> bool: """Only passes for edges with PubMed citations matching one of the contained PubMed identifiers :return: If the edge has a PubMed citation with one of the contained PubMed identifiers """ return has_pubmed(data) and data[CITATION][CITATION_REFERENCE] not in pmids return pmid_exclusion_filter
[docs]def build_author_inclusion_filter(author: Strings) -> EdgePredicate: """Only passes for edges with author information that matches one of the given authors :param author: The author or list of authors to filter by """ if isinstance(author, str): @edge_predicate def author_filter(data: Mapping) -> bool: """Only passes for edges with citations with an author that matches the contained author :return: If the edge has a citation with an author that matches the the contained author """ return has_authors(data) and author in data[CITATION][CITATION_AUTHORS] return author_filter else: authors = set(author) @edge_predicate def author_filter(data: Mapping) -> bool: """Only passes for edges with citations with an author that matches one or more of the contained authors :return: If the edge has a citation with an author that matches the the contained author """ return has_authors(data) and any( author in data[CITATION][CITATION_AUTHORS] for author in authors ) return author_filter
def node_has_namespace(node: BaseEntity, namespace: str): ns = node.get(NAMESPACE) return ns is not None and ns == namespace def node_has_namespaces(node: BaseEntity, namespaces: Set[str]): ns = node.get(NAMESPACE) return ns is not None and ns in namespaces
[docs]def build_source_namespace_filter(namespaces: Strings) -> EdgePredicate: """Only passes for edges whose source nodes have the given namespace or one of the given namespaces :param namespaces: The namespace or namespaces to filter by """ if isinstance(namespaces, str): def source_namespace_filter(_, u: BaseEntity, __, ___) -> bool: return node_has_namespace(u, namespaces) elif isinstance(namespaces, Iterable): namespaces = set(namespaces) def source_namespace_filter(_, u: BaseEntity, __, ___) -> bool: return node_has_namespaces(u, namespaces) else: raise TypeError return source_namespace_filter
[docs]def build_target_namespace_filter(namespaces: Strings) -> EdgePredicate: """Only passes for edges whose target nodes have the given namespace or one of the given namespaces :param namespaces: The namespace or namespaces to filter by """ if isinstance(namespaces, str): def target_namespace_filter(_, __, v: BaseEntity, ___) -> bool: return node_has_namespace(v, namespaces) elif isinstance(namespaces, Iterable): namespaces = set(namespaces) def target_namespace_filter(_, __, v: BaseEntity, ___) -> bool: return node_has_namespaces(v, namespaces) else: raise TypeError return target_namespace_filter