Source code for pybel.struct.mutation.induction.paths

# -*- coding: utf-8 -*-

"""Induction methods for graphs over shortest paths."""

import itertools as itt
import logging
import random
from typing import Iterable, List, Optional, Set, Tuple

import networkx as nx

from .utils import get_subgraph_by_induction
from ...graph import BELGraph
from ...pipeline import transformation
from ....constants import FUNCTION, PATHOLOGY
from ....dsl import BaseEntity

__all__ = [
    "get_nodes_in_all_shortest_paths",
    "get_subgraph_by_all_shortest_paths",
    "get_random_path",
]

logger = logging.getLogger(__name__)


def _remove_pathologies_oop(graph: BELGraph):
    """Remove pathology nodes from the graph."""
    rv = graph.copy()
    victims = [node for node in rv if node[FUNCTION] == PATHOLOGY]
    rv.remove_nodes_from(victims)
    return rv


def _iterate_nodes_in_shortest_paths(
    graph: BELGraph,
    nodes: Iterable[BaseEntity],
    weight: Optional[str] = None,
) -> Iterable[BaseEntity]:
    """Iterate over nodes in the shortest paths between all pairs of nodes in the given list."""
    for source, target in itt.product(nodes, repeat=2):
        try:
            paths = nx.all_shortest_paths(graph, source, target, weight=weight)
            for path in paths:
                for node in path:
                    yield node
        except nx.exception.NetworkXNoPath:
            continue


[docs]def get_nodes_in_all_shortest_paths( graph: BELGraph, nodes: Iterable[BaseEntity], weight: Optional[str] = None, remove_pathologies: bool = False, ) -> Set[BaseEntity]: """Get a set of nodes in all shortest paths between the given nodes. Thinly wraps :func:`networkx.all_shortest_paths`. :param graph: A BEL graph :param nodes: The list of nodes to use to use to find all shortest paths :param weight: Edge data key corresponding to the edge weight. If none, uses unweighted search. :param remove_pathologies: Should pathology nodes be removed first? :return: A set of nodes appearing in the shortest paths between nodes in the BEL graph .. note:: This can be trivially parallelized using :func:`networkx.single_source_shortest_path` """ if remove_pathologies: graph = _remove_pathologies_oop(graph) return set(_iterate_nodes_in_shortest_paths(graph, nodes, weight=weight))
[docs]@transformation def get_subgraph_by_all_shortest_paths( graph, nodes: Iterable[BaseEntity], weight: Optional[str] = None, remove_pathologies: bool = False, ) -> Optional[BELGraph]: """Induce a subgraph over the nodes in the pairwise shortest paths between all of the nodes in the given list. :param pybel.BELGraph graph: A BEL graph :param nodes: A set of nodes over which to calculate shortest paths :param weight: Edge data key corresponding to the edge weight. If None, performs unweighted search :param remove_pathologies: Should the pathology nodes be deleted before getting shortest paths? :return: A BEL graph induced over the nodes appearing in the shortest paths between the given nodes :rtype: Optional[pybel.BELGraph] """ query_nodes = [] for node in nodes: if node not in graph: logger.debug("%s not in %s", node, graph) continue query_nodes.append(node) if not query_nodes: return induced_nodes = get_nodes_in_all_shortest_paths( graph, query_nodes, weight=weight, remove_pathologies=remove_pathologies, ) if not induced_nodes: return return get_subgraph_by_induction(graph, induced_nodes)
[docs]def get_random_path(graph: BELGraph) -> List[BaseEntity]: """Get a random path from the graph as a list of nodes. :param graph: A BEL graph """ wg = graph.to_undirected() nodes = wg.nodes() def pick_random_pair() -> Tuple[BaseEntity, BaseEntity]: """Get a pair of random nodes.""" return random.sample(nodes, k=2) source, target = pick_random_pair() tries = 0 sentinel_tries = 5 while not nx.has_path(wg, source, target) and tries < sentinel_tries: tries += 1 source, target = pick_random_pair() if tries == sentinel_tries: return [source] return nx.shortest_path(wg, source=source, target=target)