Source code for pybel.struct.mutation.induction.random_subgraph

# -*- coding: utf-8 -*-

"""Functions for inducing random sub-graphs."""

import bisect
import logging
import random
from operator import itemgetter
from typing import Any, Iterable, Mapping, Optional, Set, Tuple

from ..utils import remove_isolated_nodes
from ...graph import BELGraph
from ...pipeline import transformation
from ...utils import update_metadata
from ....dsl import BaseEntity

__all__ = [

logger = logging.getLogger(__name__)

def _random_edge_iterator(graph: BELGraph, n_edges: int) -> Iterable[Tuple[BaseEntity, BaseEntity, int, Mapping]]:
    """Get a random set of edges from the graph and randomly samples a key from each.

    :param graph: A BEL graph
    :param n_edges: Number of edges to randomly select from the given graph
    edges = list(graph.edges())
    edge_sample = random.sample(edges, n_edges)
    for u, v in edge_sample:
        keys = list(graph[u][v])
        k = random.choice(keys)
        yield u, v, k, graph[u][v][k]

[docs]@transformation def get_graph_with_random_edges(graph: BELGraph, n_edges: int) -> BELGraph: """Build a new graph from a seeding of edges. :param graph: A BEL graph :param n_edges: Number of edges to randomly select from the given graph """ rv = graph.child() rv.add_edges_from(_random_edge_iterator(graph, n_edges)) return rv
#: How many edges should be sampled from a graph that's still reasonable to display SAMPLE_RANDOM_EDGE_COUNT = 250 #: How many edges should be sampled as "seed" edges SAMPLE_RANDOM_EDGE_SEED_COUNT = 5 class WeightedRandomGenerator: """A weighted random number generator. Adapted from: """ def __init__(self, values, weights): """Build a weighted random generator. :param Any values: A sequence corresponding to the weights :param weights: Weights for each. Should all be positive, but not necessarily normalized. """ self.values = values self.totals = [] weight_total = 0 for weight in weights: weight_total += weight self.totals.append(weight_total) @property def total(self): """Get the total weight stored.""" return self.totals[-1] def next_index(self) -> int: """Get a random index.""" return bisect.bisect_right(self.totals, random.random() * def next(self) -> Any: """Get a random value.""" return self.values[self.next_index()]
[docs]def get_random_node( graph, node_blacklist: Set[BaseEntity], invert_degrees: Optional[bool] = None, ) -> Optional[BaseEntity]: """Choose a node from the graph with probabilities based on their degrees. :type graph: networkx.Graph :param node_blacklist: Nodes to filter out :param invert_degrees: Should the degrees be inverted? Defaults to true. """ try: nodes, degrees = zip( *( (node, degree) for node, degree in sorted(, key=itemgetter(1)) if node not in node_blacklist ), ) except ValueError: # something wrong with graph, probably no elements in graph.degree_iter return if invert_degrees is None or invert_degrees: # More likely to choose low degree nodes to explore, so don't make hubs degrees = [1 / degree for degree in degrees] wrg = WeightedRandomGenerator(nodes, degrees) return # noqa: B305
def _helper( result, graph, number_edges_remaining: int, node_blacklist: Set[BaseEntity], invert_degrees: Optional[bool] = None, ) -> None: """Help build a random graph. :type result: networkx.Graph :type graph: networkx.Graph """ original_node_count = graph.number_of_nodes() logger.debug("adding remaining %d edges", number_edges_remaining) for _ in range(number_edges_remaining): source, possible_step_nodes, c = None, set(), 0 while not source or not possible_step_nodes: source = get_random_node(result, node_blacklist, invert_degrees=invert_degrees) c += 1 if c >= original_node_count: logger.warning("infinite loop happening") logger.warning("source: %s", source) logger.warning("no grow: %s", node_blacklist) return # Happens when after exhausting the connected components. Try increasing the number seed edges if source is None: continue # maybe do something else? # Only keep targets in the original graph that aren't in the result graph possible_step_nodes = set(graph[source]) - set(result[source]) if not possible_step_nodes: node_blacklist.add( source, ) # there aren't any possible nodes to step to, so try growing from somewhere else step_node = random.choice(list(possible_step_nodes)) # it's not really a big deal which, but it might be possible to weight this by the utility of edges later key, attr_dict = random.choice(list(graph[source][step_node].items())) result.add_edge(source, step_node, key=key, **attr_dict)
[docs]@transformation def get_random_subgraph( graph: BELGraph, number_edges: Optional[int] = None, number_seed_edges: Optional[int] = None, seed: Optional[int] = None, invert_degrees: Optional[bool] = None, ) -> BELGraph: """Generate a random subgraph based on weighted random walks from random seed edges. :type graph: pybel.BELGraph graph :param number_edges: Maximum number of edges. Defaults to :data:`pybel_tools.constants.SAMPLE_RANDOM_EDGE_COUNT` (250). :param number_seed_edges: Number of nodes to start with (which likely results in different components in large graphs). Defaults to :data:`SAMPLE_RANDOM_EDGE_SEED_COUNT` (5). :param seed: A seed for the random state :param invert_degrees: Should the degrees be inverted? Defaults to true. """ if number_edges is None: number_edges = SAMPLE_RANDOM_EDGE_COUNT if number_seed_edges is None: number_seed_edges = SAMPLE_RANDOM_EDGE_SEED_COUNT if seed is not None: random.seed(seed) # Check if graph will sample full graph, and just return it if it would if graph.number_of_edges() <= number_edges:"sampled full graph") return graph.copy() logger.debug( "getting random sub-graph with %d seed edges, %d final edges, and seed=%s", number_seed_edges, number_edges, seed, ) # Get initial graph with `number_seed_edges` edges result = get_graph_with_random_edges(graph, number_seed_edges) number_edges_remaining = number_edges - result.number_of_edges() _helper( result, graph, number_edges_remaining, node_blacklist=set(), # This is the set of nodes that should no longer be chosen to grow from invert_degrees=invert_degrees, ) logger.debug("removing isolated nodes") remove_isolated_nodes(result) return result