Source code for pybel.canonicalize

# -*- coding: utf-8 -*-

"""This module contains output functions to BEL scripts."""

import gzip
import itertools as itt
import logging
import time
import warnings
from typing import Iterable, List, Mapping, Optional, TextIO, Tuple, Union

import bel_resources.constants
from bel_resources import make_knowledge_header
from networkx.utils import open_file

from .constants import (
    ACTIVITY,
    ANNOTATIONS,
    CITATION,
    CITATION_TYPE_PUBMED,
    DEGRADATION,
    EFFECT,
    EVIDENCE,
    FROM_LOC,
    LOCATION,
    MODIFIER,
    NAME,
    NAMESPACE,
    PYBEL_AUTOEVIDENCE,
    PYBEL_PUBMED,
    RELATION,
    SET_CITATION_FMT,
    SOURCE_MODIFIER,
    TARGET_MODIFIER,
    TO_LOC,
    TRANSLOCATION,
    UNQUALIFIED_EDGES,
    VARIANTS,
)
from .dsl import BaseAbundance, BaseEntity, FusionBase, ListAbundance, Reaction
from .language import Entity
from .typing import EdgeData
from .utils import ensure_quotes
from .version import VERSION

__all__ = [
    "to_bel_script",
    "to_bel_script_gz",
    "to_bel_script_lines",
    "edge_to_bel",
    "edge_to_tuple",
    "calculate_canonical_name",
]

logger = logging.getLogger(__name__)

EdgeTuple = Tuple[BaseEntity, BaseEntity, str, EdgeData]


[docs]@open_file(1, mode="w") def to_bel_script(graph, path: Union[str, TextIO], use_identifiers: bool = True) -> None: """Write the BELGraph as a canonical BEL script. :param BELGraph graph: the BEL Graph to output as a BEL Script :param path: A path or file-like. :param use_identifiers: Enables extended `BEP-0008 <http://bep.bel.bio/published/BEP-0008.html>`_ syntax """ for line in to_bel_script_lines(graph, use_identifiers=use_identifiers): print(line, file=path)
def to_bel_script_gz(graph, path, **kwargs) -> None: """Write the graph as a BEL Script a gzip file.""" with gzip.open(path, "wt") as file: to_bel_script(graph, file, **kwargs) def to_bel_script_lines(graph, use_identifiers: bool = True) -> Iterable[str]: """Iterate over the lines of the BEL graph as a canonical BEL script. :param pybel.BELGraph graph: A BEL Graph :param use_identifiers: Enables extended `BEP-0008 <http://bep.bel.bio/published/BEP-0008.html>`_ syntax """ return itt.chain( _to_bel_lines_header(graph), _to_bel_lines_body(graph, use_identifiers=use_identifiers), _to_bel_lines_footer(graph, use_identifiers=use_identifiers), ) def postpend_location(bel_string: str, location_model) -> str: """Rip off the closing parentheses and adds canonicalized modification. I did this because writing a whole new parsing model for the data would be sad and difficult :param bel_string: BEL string representing node :param dict location_model: A dictionary containing keys :code:`pybel.constants.TO_LOC` and :code:`pybel.constants.FROM_LOC` :return: A part of a BEL string representing the location """ if not all(k in location_model for k in {NAMESPACE, NAME}): raise ValueError("Location model missing namespace and/or name keys: {}".format(location_model)) return "{}, loc({}:{}))".format( bel_string[:-1], location_model[NAMESPACE], ensure_quotes(location_model[NAME]), ) def _decanonicalize_edge_node( node: BaseEntity, edge_data: EdgeData, node_position: str, *, use_identifiers: bool = True, ) -> str: """Canonicalize a node with its modifiers stored in the given edge to a BEL string. :param node: A PyBEL node data dictionary :param edge_data: A PyBEL edge data dictionary :param node_position: Either :data:`pybel.constants.SUBJECT` or :data:`pybel.constants.OBJECT` :param use_identifiers: Enables extended `BEP-0008 <http://bep.bel.bio/published/BEP-0008.html>`_ syntax """ node_str = node.as_bel(use_identifiers=use_identifiers) if node_position not in edge_data: return node_str node_edge_data = edge_data[node_position] if LOCATION in node_edge_data: node_str = postpend_location(node_str, node_edge_data[LOCATION]) modifier = node_edge_data.get(MODIFIER) if modifier is None: return node_str if DEGRADATION == modifier: warnings.warn("degradation is deprecated", DeprecationWarning) return f"deg({node_str})" effect = node_edge_data.get(EFFECT) if ACTIVITY == modifier: if effect is None: return f"act({node_str})" return f"act({node_str}, ma({effect}))" if TRANSLOCATION == modifier: if effect is None: return f"tloc({node_str})" to_loc_data: Entity = effect[TO_LOC] from_loc_data: Entity = effect[FROM_LOC] return f"tloc({node_str}, fromLoc({from_loc_data}), toLoc({to_loc_data}))" raise ValueError("invalid modifier: {}".format(modifier)) def edge_to_tuple( source: BaseEntity, target: BaseEntity, data: EdgeData, use_identifiers: bool = True, ) -> Tuple[str, str, str]: """Take two nodes and gives back a BEL string representing the statement. :param source: The edge's source's PyBEL node data dictionary :param target: The edge's target's PyBEL node data dictionary :param data: The edge's data dictionary :param use_identifiers: Enables extended `BEP-0008 <http://bep.bel.bio/published/BEP-0008.html>`_ syntax """ u_str = _decanonicalize_edge_node(source, data, node_position=SOURCE_MODIFIER, use_identifiers=use_identifiers) v_str = _decanonicalize_edge_node(target, data, node_position=TARGET_MODIFIER, use_identifiers=use_identifiers) return u_str, data[RELATION], v_str def edge_to_bel( source: BaseEntity, target: BaseEntity, data: EdgeData, sep: Optional[str] = None, use_identifiers: bool = True, ) -> str: """Take two nodes and gives back a BEL string representing the statement. :param source: The edge's source's PyBEL node data dictionary :param target: The edge's target's PyBEL node data dictionary :param data: The edge's data dictionary :param sep: The separator between the source, relation, and target. Defaults to ' ' :param use_identifiers: Enables extended `BEP-0008 <http://bep.bel.bio/published/BEP-0008.html>`_ syntax """ sep = sep or " " return sep.join(edge_to_tuple(source=source, target=target, data=data, use_identifiers=use_identifiers)) def _sort_qualified_edges_helper(t: EdgeTuple) -> Tuple[str, str, str]: return ( t[3][CITATION].namespace, t[3][CITATION].identifier, t[3][EVIDENCE], ) def sort_qualified_edges(graph) -> Iterable[EdgeTuple]: """Return the qualified edges, sorted first by citation, then by evidence, then by annotations. :param BELGraph graph: A BEL graph """ qualified_edges = ( (u, v, k, d) for u, v, k, d in graph.edges(keys=True, data=True) if graph.has_edge_citation(u, v, k) and graph.has_edge_evidence(u, v, k) ) return sorted(qualified_edges, key=_sort_qualified_edges_helper) def _citation_sort_key(t: EdgeTuple) -> Tuple[str, str]: """Make a confusing 4 tuple sortable by citation.""" return t[3][CITATION].namespace, t[3][CITATION].identifier def _evidence_sort_key(t: EdgeTuple) -> str: """Make a confusing 4 tuple sortable by citation.""" return t[3][EVIDENCE] def _set_annotation_to_str(annotation_data: Mapping[str, List[Entity]], key: str, use_curie: bool = False) -> str: """Return a set annotation string.""" value = annotation_data[key] if len(value) == 1: value = list(value)[0] return f'SET {key} = "{value if use_curie else value.identifier}"' value_strings = ", ".join( f'"{v if use_curie else v.identifier}"' for v in sorted(value, key=lambda e: (e.namespace, e.identifier, e.name)) ) return f"SET {key} = {{{value_strings}}}" def _unset_annotation_to_str(keys: List[str]) -> str: """Return an unset annotation string.""" if len(keys) == 1: return "UNSET {}".format(list(keys)[0]) return "UNSET {{{}}}".format(", ".join("{}".format(key) for key in keys)) def _to_bel_lines_header(graph) -> Iterable[str]: """Iterate the lines of a BEL graph's corresponding BEL script's header. :param pybel.BELGraph graph: A BEL graph """ yield "# This document was created by PyBEL v{} and bel-resources v{} on {}\n".format( VERSION, bel_resources.constants.VERSION, time.asctime(), ) yield from make_knowledge_header( namespace_url=graph.namespace_url, namespace_patterns=graph.namespace_pattern, annotation_url=graph.annotation_url, annotation_patterns=graph.annotation_pattern, annotation_list=graph.annotation_list, **graph.document, ) def group_citation_edges( edges: Iterable[EdgeTuple], ) -> Iterable[Tuple[Tuple[str, str], Iterable[EdgeTuple]]]: """Return an iterator over pairs of citation values and their corresponding edge iterators.""" return itt.groupby(edges, key=_citation_sort_key) def group_evidence_edges( edges: Iterable[EdgeTuple], ) -> Iterable[Tuple[str, Iterable[EdgeTuple]]]: """Return an iterator over pairs of evidence values and their corresponding edge iterators.""" return itt.groupby(edges, key=_evidence_sort_key) def _to_bel_lines_body(graph, use_identifiers: bool = False) -> Iterable[str]: """Iterate the lines of a BEL graph's corresponding BEL script's body. :param pybel.BELGraph graph: A BEL graph :param use_identifiers: Enables extended `BEP-0008 <http://bep.bel.bio/published/BEP-0008.html>`_ syntax """ qualified_edges = sort_qualified_edges(graph) for (citation_db, citation_id), citation_edges in group_citation_edges(qualified_edges): yield SET_CITATION_FMT.format(citation_db, citation_id) + "\n" for evidence, evidence_edges in group_evidence_edges(citation_edges): yield 'SET SupportingText = "{}"'.format(evidence) for u, v, _, data in evidence_edges: annotations_data = data.get(ANNOTATIONS) keys = sorted(annotations_data) if annotations_data is not None else tuple() for key in keys: yield _set_annotation_to_str(annotations_data, key, use_curie=key in graph.annotation_curie) yield graph.edge_to_bel(u, v, data, use_identifiers=use_identifiers) if keys: yield _unset_annotation_to_str(keys) yield "UNSET SupportingText" yield "UNSET Citation\n" yield "#" * 80 def _to_bel_lines_footer(graph, use_identifiers: bool = False) -> Iterable[str]: """Iterate the lines of a BEL graph's corresponding BEL script's footer. :param pybel.BELGraph graph: A BEL graph :param use_identifiers: Enables extended `BEP-0008 <http://bep.bel.bio/published/BEP-0008.html>`_ syntax """ unqualified_edges_to_serialize = [ (u, v, d) for u, v, d in graph.edges(data=True) if d[RELATION] in UNQUALIFIED_EDGES and EVIDENCE not in d ] isolated_nodes_to_serialize = [node for node in graph if not graph.pred[node] and not graph.succ[node]] if unqualified_edges_to_serialize or isolated_nodes_to_serialize: yield "###############################################\n" yield SET_CITATION_FMT.format(CITATION_TYPE_PUBMED, PYBEL_PUBMED) yield 'SET SupportingText = "{}"'.format(PYBEL_AUTOEVIDENCE) for u, v, data in unqualified_edges_to_serialize: yield "{} {} {}".format( u.as_bel(use_identifiers=use_identifiers), data[RELATION], v.as_bel(use_identifiers=use_identifiers), ) for node in isolated_nodes_to_serialize: yield node.as_bel(use_identifiers=use_identifiers) yield "UNSET SupportingText" yield "UNSET Citation" def calculate_canonical_name(node: BaseEntity, use_identifiers: bool = True) -> str: """Calculate the canonical name for a given node. If it is a simple node, uses the already given name. Otherwise, it uses the BEL string. """ if isinstance(node, (Reaction, ListAbundance, FusionBase)): return node.as_bel(use_identifiers=True) elif isinstance(node, BaseAbundance): if VARIANTS in node: return node.as_bel(use_identifiers=True) elif use_identifiers and node.entity.identifier and node.entity.name: return node.obo else: return node.curie else: raise TypeError("Unhandled node: {}".format(node))