Source code for pybel.canonicalize

# -*- coding: utf-8 -*-

"""This module contains output functions to BEL scripts."""

import itertools as itt
import logging
import time
from typing import Iterable, List, Mapping, Optional, TextIO, Tuple, Union

from networkx.utils import open_file

import bel_resources.constants
from bel_resources import make_knowledge_header
from .constants import (
    ACTIVITY, ANNOTATIONS, BEL_DEFAULT_NAMESPACE, CELL_SURFACE, CITATION, CITATION_DB, CITATION_IDENTIFIER, DEGRADATION,
    EFFECT, EVIDENCE, EXTRACELLULAR, FROM_LOC, INTRACELLULAR, LOCATION, MODIFIER, NAME, NAMESPACE, OBJECT,
    PYBEL_AUTOEVIDENCE, RELATION, SUBJECT, TO_LOC, TRANSLOCATION, UNQUALIFIED_EDGES, VARIANTS,
)
from .dsl import BaseAbundance, BaseEntity, FusionBase, ListAbundance, Reaction
from .typing import EdgeData
from .utils import ensure_quotes
from .version import VERSION

__all__ = [
    'to_bel_script',
    'to_bel_script_lines',
    'edge_to_bel',
    'edge_to_tuple',
    'calculate_canonical_name',
]

logger = logging.getLogger(__name__)

EdgeTuple = Tuple[BaseEntity, BaseEntity, str, EdgeData]


[docs]@open_file(1, mode='w') def to_bel_script(graph, path: Union[str, TextIO]) -> None: """Write the BELGraph as a canonical BEL script. :param BELGraph graph: the BEL Graph to output as a BEL Script :param path: A path or file-like. """ for line in to_bel_script_lines(graph): print(line, file=path)
def to_bel_script_lines(graph) -> Iterable[str]: """Iterate over the lines of the BEL graph as a canonical BEL script. :param pybel.BELGraph graph: A BEL Graph """ return itt.chain( _to_bel_lines_header(graph), _to_bel_lines_body(graph), _to_bel_lines_footer(graph), ) def postpend_location(bel_string: str, location_model) -> str: """Rip off the closing parentheses and adds canonicalized modification. I did this because writing a whole new parsing model for the data would be sad and difficult :param bel_string: BEL string representing node :param dict location_model: A dictionary containing keys :code:`pybel.constants.TO_LOC` and :code:`pybel.constants.FROM_LOC` :return: A part of a BEL string representing the location """ if not all(k in location_model for k in {NAMESPACE, NAME}): raise ValueError('Location model missing namespace and/or name keys: {}'.format(location_model)) return "{}, loc({}:{}))".format( bel_string[:-1], location_model[NAMESPACE], ensure_quotes(location_model[NAME]), ) def _decanonicalize_edge_node(node: BaseEntity, edge_data: EdgeData, node_position: str) -> str: """Canonicalize a node with its modifiers stored in the given edge to a BEL string. :param node: A PyBEL node data dictionary :param edge_data: A PyBEL edge data dictionary :param node_position: Either :data:`pybel.constants.SUBJECT` or :data:`pybel.constants.OBJECT` """ node_str = node.as_bel() if node_position not in edge_data: return node_str node_edge_data = edge_data[node_position] if LOCATION in node_edge_data: node_str = postpend_location(node_str, node_edge_data[LOCATION]) modifier = node_edge_data.get(MODIFIER) if modifier is None: return node_str if DEGRADATION == modifier: return "deg({})".format(node_str) effect = node_edge_data.get(EFFECT) if ACTIVITY == modifier: if effect is None: return "act({})".format(node_str) if effect[NAMESPACE] == BEL_DEFAULT_NAMESPACE: return "act({}, ma({}))".format(node_str, effect[NAME]) return "act({}, ma({}:{}))".format(node_str, effect[NAMESPACE], ensure_quotes(effect[NAME])) if TRANSLOCATION == modifier: if effect is None: return 'tloc({})'.format(node_str) to_loc_data = effect[TO_LOC] from_loc_data = effect[FROM_LOC] if from_loc_data[NAMESPACE] == BEL_DEFAULT_NAMESPACE and from_loc_data[NAME] == INTRACELLULAR: if to_loc_data[NAMESPACE] == BEL_DEFAULT_NAMESPACE and to_loc_data[NAME] == EXTRACELLULAR: return 'sec({})'.format(node_str) if to_loc_data[NAMESPACE] == BEL_DEFAULT_NAMESPACE and to_loc_data[NAME] == CELL_SURFACE: return 'surf({})'.format(node_str) from_loc = _get_tloc_terminal('fromLoc', from_loc_data) to_loc = _get_tloc_terminal('toLoc', to_loc_data) return "tloc({}, {}, {})".format(node_str, from_loc, to_loc) raise ValueError('invalid modifier: {}'.format(modifier)) def _get_tloc_terminal(side, data): return "{}({}:{})".format( side, data[NAMESPACE], ensure_quotes(data[NAME]), ) def edge_to_tuple(u: BaseEntity, v: BaseEntity, data: EdgeData) -> Tuple[str, str, str]: """Take two nodes and gives back a BEL string representing the statement. :param u: The edge's source's PyBEL node data dictionary :param v: The edge's target's PyBEL node data dictionary :param data: The edge's data dictionary """ u_str = _decanonicalize_edge_node(u, data, node_position=SUBJECT) v_str = _decanonicalize_edge_node(v, data, node_position=OBJECT) return u_str, data[RELATION], v_str def edge_to_bel(u: BaseEntity, v: BaseEntity, data: EdgeData, sep: Optional[str] = None) -> str: """Take two nodes and gives back a BEL string representing the statement. :param u: The edge's source's PyBEL node data dictionary :param v: The edge's target's PyBEL node data dictionary :param data: The edge's data dictionary :param sep: The separator between the source, relation, and target. Defaults to ' ' """ sep = sep or ' ' return sep.join(edge_to_tuple(u=u, v=v, data=data)) def _sort_qualified_edges_helper(t: EdgeTuple) -> Tuple[str, str, str]: return ( t[3][CITATION][CITATION_DB], t[3][CITATION][CITATION_IDENTIFIER], t[3][EVIDENCE], ) def sort_qualified_edges(graph) -> Iterable[EdgeTuple]: """Return the qualified edges, sorted first by citation, then by evidence, then by annotations. :param BELGraph graph: A BEL graph """ qualified_edges = ( (u, v, k, d) for u, v, k, d in graph.edges(keys=True, data=True) if graph.has_edge_citation(u, v, k) and graph.has_edge_evidence(u, v, k) ) return sorted(qualified_edges, key=_sort_qualified_edges_helper) def _citation_sort_key(t: EdgeTuple) -> str: """Make a confusing 4 tuple sortable by citation.""" return '"{}", "{}"'.format(t[3][CITATION][CITATION_DB], t[3][CITATION][CITATION_IDENTIFIER]) def _evidence_sort_key(t: EdgeTuple) -> str: """Make a confusing 4 tuple sortable by citation.""" return t[3][EVIDENCE] def _set_annotation_to_str(annotation_data: Mapping[str, Mapping[str, bool]], key: str) -> str: """Return a set annotation string.""" value = annotation_data[key] if len(value) == 1: return 'SET {} = "{}"'.format(key, list(value)[0]) x = ('"{}"'.format(v) for v in sorted(value)) return 'SET {} = {{{}}}'.format(key, ', '.join(x)) def _unset_annotation_to_str(keys: List[str]) -> str: """Return an unset annotation string.""" if len(keys) == 1: return 'UNSET {}'.format(list(keys)[0]) return 'UNSET {{{}}}'.format(', '.join('{}'.format(key) for key in keys)) def _to_bel_lines_header(graph) -> Iterable[str]: """Iterate the lines of a BEL graph's corresponding BEL script's header. :param pybel.BELGraph graph: A BEL graph """ yield '# This document was created by PyBEL v{} and bel-resources v{} on {}\n'.format( VERSION, bel_resources.constants.VERSION, time.asctime(), ) yield from make_knowledge_header( namespace_url=graph.namespace_url, namespace_patterns=graph.namespace_pattern, annotation_url=graph.annotation_url, annotation_patterns=graph.annotation_pattern, annotation_list=graph.annotation_list, **graph.document, ) def group_citation_edges(edges: Iterable[EdgeTuple]) -> Iterable[Tuple[str, Iterable[EdgeTuple]]]: """Return an iterator over pairs of citation values and their corresponding edge iterators.""" return itt.groupby(edges, key=_citation_sort_key) def group_evidence_edges(edges: Iterable[EdgeTuple]) -> Iterable[Tuple[str, Iterable[EdgeTuple]]]: """Return an iterator over pairs of evidence values and their corresponding edge iterators.""" return itt.groupby(edges, key=_evidence_sort_key) def _to_bel_lines_body(graph) -> Iterable[str]: """Iterate the lines of a BEL graph's corresponding BEL script's body. :param pybel.BELGraph graph: A BEL graph """ qualified_edges = sort_qualified_edges(graph) for citation, citation_edges in group_citation_edges(qualified_edges): yield 'SET Citation = {{{}}}\n'.format(citation) for evidence, evidence_edges in group_evidence_edges(citation_edges): yield 'SET SupportingText = "{}"'.format(evidence) for u, v, _, data in evidence_edges: annotations_data = data.get(ANNOTATIONS) keys = sorted(annotations_data) if annotations_data is not None else tuple() for key in keys: yield _set_annotation_to_str(annotations_data, key) yield graph.edge_to_bel(u, v, data) if keys: yield _unset_annotation_to_str(keys) yield 'UNSET SupportingText' yield 'UNSET Citation\n' yield '#' * 80 def _to_bel_lines_footer(graph) -> Iterable[str]: """Iterate the lines of a BEL graph's corresponding BEL script's footer. :param pybel.BELGraph graph: A BEL graph """ unqualified_edges_to_serialize = [ (u, v, d) for u, v, d in graph.edges(data=True) if d[RELATION] in UNQUALIFIED_EDGES and EVIDENCE not in d ] isolated_nodes_to_serialize = [ node for node in graph if not graph.pred[node] and not graph.succ[node] ] if unqualified_edges_to_serialize or isolated_nodes_to_serialize: yield '###############################################\n' yield 'SET Citation = {"PubMed","Added by PyBEL","29048466"}' yield 'SET SupportingText = "{}"'.format(PYBEL_AUTOEVIDENCE) for u, v, data in unqualified_edges_to_serialize: yield '{} {} {}'.format(u.as_bel(), data[RELATION], v.as_bel()) for node in isolated_nodes_to_serialize: yield node.as_bel() yield 'UNSET SupportingText' yield 'UNSET Citation' def calculate_canonical_name(node: BaseEntity, use_curie: bool = False) -> str: """Calculate the canonical name for a given node. If it is a simple node, uses the already given name. Otherwise, it uses the BEL string. """ if isinstance(node, (Reaction, ListAbundance, FusionBase)): return node.as_bel(use_identifiers=True) elif isinstance(node, BaseAbundance): if VARIANTS in node: return node.as_bel(use_identifiers=True) elif use_curie: return node.curie else: return node.obo else: raise TypeError('Unhandled node: {}'.format(node))