Source code for pybel.io.cx

# -*- coding: utf-8 -*-

"""This module wraps conversion between :class:`pybel.BELGraph` and the Cyberinfrastructure Exchange (CX) JSON.

CX is an aspect-oriented network interchange format encoded in JSON with a format inspired by the JSON-LD encoding of
Resource Description Framework (RDF). It is primarily used by the Network Data Exchange (NDEx) and more recent versions
of Cytoscape.

.. seealso::

    - The NDEx Data Model `Specification <http://www.home.ndexbio.org/data-model/>`_
    - `Cytoscape.js <http://js.cytoscape.org/>`_
    - CX Support for Cytoscape.js on the Cytoscape `App Store <http://apps.cytoscape.org/apps/cxsupport>`_
"""

import gzip
import json
import logging
import time
from collections import defaultdict
from operator import methodcaller
from typing import Dict, List, Mapping, Optional, TextIO, Union

from networkx.utils import open_file

from ..canonicalize import calculate_canonical_name
from ..constants import (
    ANNOTATIONS,
    CITATION,
    EVIDENCE,
    FUSION,
    GRAPH_ANNOTATION_LIST,
    GRAPH_ANNOTATION_PATTERN,
    GRAPH_ANNOTATION_URL,
    GRAPH_METADATA,
    GRAPH_NAMESPACE_PATTERN,
    GRAPH_NAMESPACE_URL,
    MEMBERS,
    NAME,
    PARTNER_3P,
    PARTNER_5P,
    PRODUCTS,
    RANGE_3P,
    RANGE_5P,
    REACTANTS,
    RELATION,
    SOURCE_MODIFIER,
    TARGET_MODIFIER,
    UNQUALIFIED_EDGES,
    VARIANTS,
)
from ..dsl import BaseAbundance, BaseEntity
from ..language import Entity
from ..struct import BELGraph
from ..tokens import parse_result_to_dsl
from ..utils import expand_dict, flatten_dict

__all__ = [
    "to_cx",
    "to_cx_file",
    "to_cx_gz",
    "to_cx_jsons",
    "from_cx",
    "from_cx_file",
    "from_cx_gz",
    "from_cx_jsons",
    "NDEX_SOURCE_FORMAT",
]

log = logging.getLogger(__name__)

CX_NODE_NAME = "label"
NDEX_SOURCE_FORMAT = "ndex:sourceFormat"

NDEX_SOURCE_MODIFIER = "sourceModifier"
NDEX_TARGET_MODIFIER = "targetModifier"


def _cx_to_dict(list_of_dicts: List[Dict], key_tag: str = "k", value_tag: str = "v") -> Dict:
    """Convert a CX list of dictionaries to a flat dictionary."""
    return {d[key_tag]: d[value_tag] for d in list_of_dicts}


def _cleanse_fusion_dict(d: Dict) -> Dict:
    """Fix the fusion partner names."""
    return {k.replace("_", ""): v for k, v in d.items()}


_p_dict = {
    "partner5p": PARTNER_5P,
    "partner3p": PARTNER_3P,
    "range5p": RANGE_5P,
    "range3p": RANGE_3P,
}


def _restore_fusion_dict(d: Dict) -> Dict:
    return {_p_dict[k]: v for k, v in d.items()}


def build_node_mapping(graph: BELGraph) -> Mapping[BaseEntity, int]:
    """Build a mapping from a graph's nodes to their canonical sort order."""
    return {node: node_index for node_index, node in enumerate(sorted(graph, key=methodcaller("as_bel")))}


[docs]def to_cx(graph: BELGraph) -> List[Dict]: # noqa: C901 """Convert a BEL Graph to a CX JSON object for use with `NDEx <http://www.ndexbio.org/>`_. .. seealso:: - `NDEx Python Client <https://github.com/ndexbio/ndex-python>`_ """ node_mapping = build_node_mapping(graph) node_index_data = {} nodes_entry = [] node_attributes_entry = [] for node, node_index in node_mapping.items(): node_index_data[node_index] = node node_entry_dict = { "@id": node_index, "n": calculate_canonical_name(node), } if isinstance(node, BaseAbundance): node_entry_dict["r"] = node.curie nodes_entry.append(node_entry_dict) aliases = [] if isinstance(node, BaseAbundance): aliases.extend(xref.curie for xref in node.xrefs) if aliases: node_attributes_entry.append( { "po": node_index, "n": "alias", "v": aliases, "d": "list_of_str", } ) for k, v in node.items(): if k == VARIANTS: for i, el in enumerate(v): for a, b in flatten_dict(el).items(): node_attributes_entry.append( { "po": node_index, "n": "{}_{}_{}".format(k, i, a), "v": b, } ) elif k == FUSION: v = _cleanse_fusion_dict(v) for a, b in flatten_dict(v).items(): node_attributes_entry.append( { "po": node_index, "n": "{}_{}".format(k, a), "v": b, } ) elif k == NAME: node_attributes_entry.append( { "po": node_index, "n": CX_NODE_NAME, "v": v, } ) elif k in {PRODUCTS, REACTANTS, MEMBERS}: node_attributes_entry.append( { "po": node_index, "n": k, "v": json.dumps(v), } ) else: node_attributes_entry.append( { "po": node_index, "n": k, "v": v, } ) edges_entry = [] edge_attributes_entry = [] for edge_index, (source, target, d) in enumerate(graph.edges(data=True)): uid = node_mapping[source] vid = node_mapping[target] edges_entry.append( { "@id": edge_index, "s": uid, "t": vid, "i": d[RELATION], } ) if EVIDENCE in d: edge_attributes_entry.append( { "po": edge_index, "n": EVIDENCE, "v": d[EVIDENCE], } ) for k, v in d[CITATION].items(): edge_attributes_entry.append( { "po": edge_index, "n": "{}_{}".format(CITATION, k), "v": v, } ) if ANNOTATIONS in d: for annotation, values in d[ANNOTATIONS].items(): edge_attributes_entry.append( { "po": edge_index, "n": annotation, "v": sorted(values, key=lambda e: (e.namespace, e.identifier, e.name)), "d": "list_of_string", } ) if SOURCE_MODIFIER in d: for k, v in flatten_dict(d[SOURCE_MODIFIER]).items(): edge_attributes_entry.append( { "po": edge_index, "n": "{}_{}".format(NDEX_SOURCE_MODIFIER, k), "v": v, } ) if TARGET_MODIFIER in d: for k, v in flatten_dict(d[TARGET_MODIFIER]).items(): edge_attributes_entry.append( { "po": edge_index, "n": "{}_{}".format(NDEX_TARGET_MODIFIER, k), "v": v, } ) context_legend = {} for key in graph.namespace_url: context_legend[key] = GRAPH_NAMESPACE_URL for key in graph.namespace_pattern: context_legend[key] = GRAPH_NAMESPACE_PATTERN for key in graph.annotation_url: context_legend[key] = GRAPH_ANNOTATION_URL for key in graph.annotation_pattern: context_legend[key] = GRAPH_ANNOTATION_PATTERN for key in graph.annotation_list: context_legend[key] = GRAPH_ANNOTATION_LIST context_legend_entry = [] for keyword, resource_type in context_legend.items(): context_legend_entry.append( { "k": keyword, "v": resource_type, } ) annotation_list_keys_lookup = {keyword: i for i, keyword in enumerate(sorted(graph.annotation_list))} annotation_lists_entry = [] for keyword, values in graph.annotation_list.items(): for v in values: annotation_lists_entry.append( { "k": annotation_list_keys_lookup[keyword], "v": v, } ) context_entry_dict = {} context_entry_dict.update(graph.namespace_url) context_entry_dict.update(graph.namespace_pattern) context_entry_dict.update(graph.annotation_url) context_entry_dict.update(graph.annotation_pattern) context_entry_dict.update(annotation_list_keys_lookup) context_entry_dict.update(graph.namespace_url) context_entry = [context_entry_dict] network_attributes_entry = [ { "n": NDEX_SOURCE_FORMAT, "v": "PyBEL", } ] for k, v in graph.document.items(): network_attributes_entry.append( { "n": k, "v": v, } ) # Coalesce to cx # cx = create_aspect.number_verification() cx = [{"numberVerification": [{"longNumber": 281474976710655}]}] cx_pairs = [ ("@context", context_entry), ("context_legend", context_legend_entry), ("annotation_lists", annotation_lists_entry), ("networkAttributes", network_attributes_entry), ("nodes", nodes_entry), ("nodeAttributes", node_attributes_entry), ("edges", edges_entry), ("edgeAttributes", edge_attributes_entry), ] cx_metadata = [] for key, aspect in cx_pairs: aspect_dict = { "name": key, "elementCount": len(aspect), "lastUpdate": time.time(), "consistencyGroup": 1, "properties": [], "version": "1.0", } if key in {"citations", "supports", "nodes", "edges"}: aspect_dict["idCounter"] = len(aspect) cx_metadata.append(aspect_dict) cx.append( { "metaData": cx_metadata, } ) for key, aspect in cx_pairs: cx.append( { key: aspect, } ) cx.append({"status": [{"error": "", "success": True}]}) return cx
[docs]@open_file(1, mode="w") def to_cx_file(graph: BELGraph, path: Union[str, TextIO], indent: Optional[int] = 2, **kwargs) -> None: """Write a BEL graph to a JSON file in CX format. :param graph: A BEL graph :param path: A writable file or file-like :param indent: How many spaces to use to pretty print. Change to None for no pretty printing The example below shows how to output a BEL graph as CX to an open file. .. code-block:: python from pybel.examples import sialic_acid_graph from pybel import to_cx_file with open('graph.bel.cx.json', 'w') as file: to_cx_file(sialic_acid_graph, file) The example below shows how to output a BEL graph as CX to a file at a given path. .. code-block:: python from pybel.examples import sialic_acid_graph from pybel import to_cx_file to_cx_file(sialic_acid_graph, 'graph.bel.cx.json') If you have a big graph, you might consider storing it as a gzipped JGIF file by using :func:`to_cx_gz`. """ graph_cx_json_dict = to_cx(graph) json.dump(graph_cx_json_dict, path, ensure_ascii=False, indent=indent, **kwargs)
[docs]def to_cx_gz(graph, path: str, **kwargs) -> None: """Write a graph as CX JSON to a gzip file.""" with gzip.open(path, "wt") as file: json.dump(to_cx(graph), file, ensure_ascii=False, **kwargs)
[docs]def to_cx_jsons(graph: BELGraph, **kwargs) -> str: """Dump this graph as a CX JSON object to a string.""" return json.dumps(to_cx(graph), ensure_ascii=False, **kwargs)
def _iterate_list_of_dicts(list_of_dicts: List[Dict]): """Iterate over a list of dictionaries. :type list_of_dicts: list[dict[A,B]] :rtype: iter[tuple[A,B]] """ for dictionary in list_of_dicts: for key, value in dictionary.items(): yield key, value
[docs]def from_cx(cx: List[Dict]) -> BELGraph: # noqa: C901 """Rebuild a BELGraph from CX JSON output from PyBEL. :param cx: The CX JSON object for this graph """ graph = BELGraph() context_legend_aspect = [] annotation_lists_aspect = [] context_entry = {} network_attributes_aspect = [] nodes_aspect = [] node_attributes_aspect = [] edge_annotations_aspect = [] edges_aspect = [] meta_entries = defaultdict(list) for key, value in _iterate_list_of_dicts(cx): if key == "context_legend": context_legend_aspect.extend(value) elif key == "annotation_lists": annotation_lists_aspect.extend(value) elif key == "@context": for element in value: context_entry.update(element) elif key == "networkAttributes": network_attributes_aspect.extend(value) elif key == "nodes": nodes_aspect.extend(value) elif key == "nodeAttributes": node_attributes_aspect.extend(value) elif key == "edges": edges_aspect.extend(value) elif key == "edgeAttributes": edge_annotations_aspect.extend(value) else: meta_entries[key].extend(value) context_legend = _cx_to_dict(context_legend_aspect) annotation_lists = defaultdict(set) for data in annotation_lists_aspect: annotation_lists[data["k"]].add(data["v"]) for keyword, entry in context_entry.items(): if context_legend[keyword] == GRAPH_NAMESPACE_URL: graph.namespace_url[keyword] = entry elif context_legend[keyword] == GRAPH_NAMESPACE_PATTERN: graph.namespace_pattern[keyword] = entry elif context_legend[keyword] == GRAPH_ANNOTATION_URL: graph.annotation_url[keyword] = entry elif context_legend[keyword] == GRAPH_ANNOTATION_PATTERN: graph.annotation_pattern[keyword] = entry elif context_legend[keyword] == GRAPH_ANNOTATION_LIST: graph.annotation_list[keyword] = annotation_lists[entry] for data in network_attributes_aspect: if data["n"] == NDEX_SOURCE_FORMAT: continue graph.graph[GRAPH_METADATA][data["n"]] = data["v"] node_name = {} for data in nodes_aspect: node_name[data["@id"]] = data["n"] node_data = defaultdict(dict) for data in node_attributes_aspect: node_data[data["po"]][data["n"]] = data["v"] # put all normal data here node_data_pp = defaultdict(dict) # Group all fusion-related data here node_data_fusion = defaultdict(dict) # Group all variant-related data node_data_variants = defaultdict(lambda: defaultdict(dict)) for nid, data in node_data.items(): for key, value in data.items(): if key.startswith(FUSION): node_data_fusion[nid][key] = value elif key.startswith(VARIANTS): _, i, vls = key.split("_", 2) node_data_variants[nid][i][vls] = value elif key in {PRODUCTS, REACTANTS, MEMBERS}: node_data_pp[nid][key] = json.loads(value) else: node_data_pp[nid][key] = value for nid, data in node_data_fusion.items(): data = expand_dict(data) data[FUSION] = _restore_fusion_dict(data[FUSION]) node_data_pp[nid].update(data) for nid, data in node_data_variants.items(): node_data_pp[nid][VARIANTS] = [expand_dict(value) for _, value in sorted(data.items())] nid_node_tuple = {} for nid, data in node_data_pp.items(): if CX_NODE_NAME in data: data[NAME] = data.pop(CX_NODE_NAME) nid_node_tuple[nid] = _node = parse_result_to_dsl(data) graph.add_node_from_data(_node) edge_relation = {} eid_source_nid = {} eid_target_nid = {} for data in edges_aspect: eid = data["@id"] edge_relation[eid] = data["i"] eid_source_nid[eid] = data["s"] eid_target_nid[eid] = data["t"] edge_data = defaultdict(dict) # type: Dict[str, Dict[str, str]] for data in edge_annotations_aspect: edge_data[data["po"]][data["n"]] = data["v"] edge_citation = defaultdict(dict) # type: Dict[str, Dict[str, str]] edge_subject = defaultdict(dict) edge_object = defaultdict(dict) edge_annotations = defaultdict(lambda: defaultdict(dict)) edge_data_pp = defaultdict(dict) for eid, data in edge_data.items(): for key, value in data.items(): if key.startswith(CITATION): vl = _after_underscore(key) edge_citation[eid][vl] = value elif key.startswith(NDEX_SOURCE_MODIFIER): vl = _after_underscore(key) edge_subject[eid][vl] = value elif key.startswith(NDEX_TARGET_MODIFIER): vl = _after_underscore(key) edge_object[eid][vl] = value elif key == EVIDENCE: edge_data_pp[eid][EVIDENCE] = value else: edge_annotations[eid][key] = value for eid, data in edge_citation.items(): edge_data_pp[eid][CITATION] = data for eid, data in edge_subject.items(): edge_data_pp[eid][SOURCE_MODIFIER] = expand_dict(data) for eid, data in edge_object.items(): edge_data_pp[eid][TARGET_MODIFIER] = expand_dict(data) for eid in edge_relation: if eid in edge_annotations: # FIXME stick this in edge_data.items() iteration edge_data_pp[eid][ANNOTATIONS] = { key: [Entity(**v) for v in values] for key, values in edge_annotations[eid].items() } if eid in edge_citation: graph.add_qualified_edge( nid_node_tuple[eid_source_nid[eid]], nid_node_tuple[eid_target_nid[eid]], relation=edge_relation[eid], citation=edge_data_pp[eid][CITATION], evidence=edge_data_pp[eid][EVIDENCE], source_modifier=edge_data_pp[eid].get(SOURCE_MODIFIER), target_modifier=edge_data_pp[eid].get(TARGET_MODIFIER), annotations=edge_data_pp[eid].get(ANNOTATIONS), ) elif edge_relation[eid] in UNQUALIFIED_EDGES: graph.add_unqualified_edge( nid_node_tuple[eid_source_nid[eid]], nid_node_tuple[eid_target_nid[eid]], edge_relation[eid], ) else: raise ValueError("problem adding edge: {}".format(eid)) return graph
def _after_underscore(key): _, vl = key.split("_", 1) return vl
[docs]@open_file(0, mode="r") def from_cx_file(path: Union[str, TextIO]) -> BELGraph: """Read a file containing CX JSON and converts to a BEL graph. :param path: A readable file or file-like containing the CX JSON for this graph :return: A BEL Graph representing the CX graph contained in the file """ return from_cx(json.load(path))
[docs]def from_cx_gz(path: str) -> BELGraph: """Read a graph as CX JSON from a gzip file.""" with gzip.open(path, "rt") as file: return from_cx(json.load(file))
[docs]def from_cx_jsons(graph_json_str: str) -> BELGraph: """Read a BEL graph from a CX JSON string.""" return from_cx(json.loads(graph_json_str))