# -*- coding: utf-8 -*-
"""This module wraps conversion between :class:`pybel.BELGraph` and the Cyberinfrastructure Exchange (CX) JSON.
CX is an aspect-oriented network interchange format encoded in JSON with a format inspired by the JSON-LD encoding of
Resource Description Framework (RDF). It is primarily used by the Network Data Exchange (NDEx) and more recent versions
of Cytoscape.
.. seealso::
- The NDEx Data Model `Specification <http://www.home.ndexbio.org/data-model/>`_
- `Cytoscape.js <http://js.cytoscape.org/>`_
- CX Support for Cytoscape.js on the Cytoscape `App Store <http://apps.cytoscape.org/apps/cxsupport>`_
"""
import gzip
import json
import logging
import time
from collections import defaultdict
from operator import methodcaller
from typing import Dict, List, Mapping, Optional, TextIO, Union
from networkx.utils import open_file
from ..canonicalize import calculate_canonical_name
from ..constants import (
ANNOTATIONS,
CITATION,
EVIDENCE,
FUSION,
GRAPH_ANNOTATION_LIST,
GRAPH_ANNOTATION_PATTERN,
GRAPH_ANNOTATION_URL,
GRAPH_METADATA,
GRAPH_NAMESPACE_PATTERN,
GRAPH_NAMESPACE_URL,
MEMBERS,
NAME,
PARTNER_3P,
PARTNER_5P,
PRODUCTS,
RANGE_3P,
RANGE_5P,
REACTANTS,
RELATION,
SOURCE_MODIFIER,
TARGET_MODIFIER,
UNQUALIFIED_EDGES,
VARIANTS,
)
from ..dsl import BaseAbundance, BaseEntity
from ..language import Entity
from ..struct import BELGraph
from ..tokens import parse_result_to_dsl
from ..utils import expand_dict, flatten_dict
__all__ = [
"to_cx",
"to_cx_file",
"to_cx_gz",
"to_cx_jsons",
"from_cx",
"from_cx_file",
"from_cx_gz",
"from_cx_jsons",
"NDEX_SOURCE_FORMAT",
]
log = logging.getLogger(__name__)
CX_NODE_NAME = "label"
NDEX_SOURCE_FORMAT = "ndex:sourceFormat"
NDEX_SOURCE_MODIFIER = "sourceModifier"
NDEX_TARGET_MODIFIER = "targetModifier"
def _cx_to_dict(list_of_dicts: List[Dict], key_tag: str = "k", value_tag: str = "v") -> Dict:
"""Convert a CX list of dictionaries to a flat dictionary."""
return {d[key_tag]: d[value_tag] for d in list_of_dicts}
def _cleanse_fusion_dict(d: Dict) -> Dict:
"""Fix the fusion partner names."""
return {k.replace("_", ""): v for k, v in d.items()}
_p_dict = {
"partner5p": PARTNER_5P,
"partner3p": PARTNER_3P,
"range5p": RANGE_5P,
"range3p": RANGE_3P,
}
def _restore_fusion_dict(d: Dict) -> Dict:
return {_p_dict[k]: v for k, v in d.items()}
def build_node_mapping(graph: BELGraph) -> Mapping[BaseEntity, int]:
"""Build a mapping from a graph's nodes to their canonical sort order."""
return {node: node_index for node_index, node in enumerate(sorted(graph, key=methodcaller("as_bel")))}
[docs]def to_cx(graph: BELGraph) -> List[Dict]: # noqa: C901
"""Convert a BEL Graph to a CX JSON object for use with `NDEx <http://www.ndexbio.org/>`_.
.. seealso::
- `NDEx Python Client <https://github.com/ndexbio/ndex-python>`_
"""
node_mapping = build_node_mapping(graph)
node_index_data = {}
nodes_entry = []
node_attributes_entry = []
for node, node_index in node_mapping.items():
node_index_data[node_index] = node
node_entry_dict = {
"@id": node_index,
"n": calculate_canonical_name(node),
}
if isinstance(node, BaseAbundance):
node_entry_dict["r"] = node.curie
nodes_entry.append(node_entry_dict)
aliases = []
if isinstance(node, BaseAbundance):
aliases.extend(xref.curie for xref in node.xrefs)
if aliases:
node_attributes_entry.append(
{
"po": node_index,
"n": "alias",
"v": aliases,
"d": "list_of_str",
}
)
for k, v in node.items():
if k == VARIANTS:
for i, el in enumerate(v):
for a, b in flatten_dict(el).items():
node_attributes_entry.append(
{
"po": node_index,
"n": "{}_{}_{}".format(k, i, a),
"v": b,
}
)
elif k == FUSION:
v = _cleanse_fusion_dict(v)
for a, b in flatten_dict(v).items():
node_attributes_entry.append(
{
"po": node_index,
"n": "{}_{}".format(k, a),
"v": b,
}
)
elif k == NAME:
node_attributes_entry.append(
{
"po": node_index,
"n": CX_NODE_NAME,
"v": v,
}
)
elif k in {PRODUCTS, REACTANTS, MEMBERS}:
node_attributes_entry.append(
{
"po": node_index,
"n": k,
"v": json.dumps(v),
}
)
else:
node_attributes_entry.append(
{
"po": node_index,
"n": k,
"v": v,
}
)
edges_entry = []
edge_attributes_entry = []
for edge_index, (source, target, d) in enumerate(graph.edges(data=True)):
uid = node_mapping[source]
vid = node_mapping[target]
edges_entry.append(
{
"@id": edge_index,
"s": uid,
"t": vid,
"i": d[RELATION],
}
)
if EVIDENCE in d:
edge_attributes_entry.append(
{
"po": edge_index,
"n": EVIDENCE,
"v": d[EVIDENCE],
}
)
for k, v in d[CITATION].items():
edge_attributes_entry.append(
{
"po": edge_index,
"n": "{}_{}".format(CITATION, k),
"v": v,
}
)
if ANNOTATIONS in d:
for annotation, values in d[ANNOTATIONS].items():
edge_attributes_entry.append(
{
"po": edge_index,
"n": annotation,
"v": sorted(values, key=lambda e: (e.namespace, e.identifier, e.name)),
"d": "list_of_string",
}
)
if SOURCE_MODIFIER in d:
for k, v in flatten_dict(d[SOURCE_MODIFIER]).items():
edge_attributes_entry.append(
{
"po": edge_index,
"n": "{}_{}".format(NDEX_SOURCE_MODIFIER, k),
"v": v,
}
)
if TARGET_MODIFIER in d:
for k, v in flatten_dict(d[TARGET_MODIFIER]).items():
edge_attributes_entry.append(
{
"po": edge_index,
"n": "{}_{}".format(NDEX_TARGET_MODIFIER, k),
"v": v,
}
)
context_legend = {}
for key in graph.namespace_url:
context_legend[key] = GRAPH_NAMESPACE_URL
for key in graph.namespace_pattern:
context_legend[key] = GRAPH_NAMESPACE_PATTERN
for key in graph.annotation_url:
context_legend[key] = GRAPH_ANNOTATION_URL
for key in graph.annotation_pattern:
context_legend[key] = GRAPH_ANNOTATION_PATTERN
for key in graph.annotation_list:
context_legend[key] = GRAPH_ANNOTATION_LIST
context_legend_entry = []
for keyword, resource_type in context_legend.items():
context_legend_entry.append(
{
"k": keyword,
"v": resource_type,
}
)
annotation_list_keys_lookup = {keyword: i for i, keyword in enumerate(sorted(graph.annotation_list))}
annotation_lists_entry = []
for keyword, values in graph.annotation_list.items():
for v in values:
annotation_lists_entry.append(
{
"k": annotation_list_keys_lookup[keyword],
"v": v,
}
)
context_entry_dict = {}
context_entry_dict.update(graph.namespace_url)
context_entry_dict.update(graph.namespace_pattern)
context_entry_dict.update(graph.annotation_url)
context_entry_dict.update(graph.annotation_pattern)
context_entry_dict.update(annotation_list_keys_lookup)
context_entry_dict.update(graph.namespace_url)
context_entry = [context_entry_dict]
network_attributes_entry = [
{
"n": NDEX_SOURCE_FORMAT,
"v": "PyBEL",
}
]
for k, v in graph.document.items():
network_attributes_entry.append(
{
"n": k,
"v": v,
}
)
# Coalesce to cx
# cx = create_aspect.number_verification()
cx = [{"numberVerification": [{"longNumber": 281474976710655}]}]
cx_pairs = [
("@context", context_entry),
("context_legend", context_legend_entry),
("annotation_lists", annotation_lists_entry),
("networkAttributes", network_attributes_entry),
("nodes", nodes_entry),
("nodeAttributes", node_attributes_entry),
("edges", edges_entry),
("edgeAttributes", edge_attributes_entry),
]
cx_metadata = []
for key, aspect in cx_pairs:
aspect_dict = {
"name": key,
"elementCount": len(aspect),
"lastUpdate": time.time(),
"consistencyGroup": 1,
"properties": [],
"version": "1.0",
}
if key in {"citations", "supports", "nodes", "edges"}:
aspect_dict["idCounter"] = len(aspect)
cx_metadata.append(aspect_dict)
cx.append(
{
"metaData": cx_metadata,
}
)
for key, aspect in cx_pairs:
cx.append(
{
key: aspect,
}
)
cx.append({"status": [{"error": "", "success": True}]})
return cx
[docs]@open_file(1, mode="w")
def to_cx_file(graph: BELGraph, path: Union[str, TextIO], indent: Optional[int] = 2, **kwargs) -> None:
"""Write a BEL graph to a JSON file in CX format.
:param graph: A BEL graph
:param path: A writable file or file-like
:param indent: How many spaces to use to pretty print. Change to None for no pretty printing
The example below shows how to output a BEL graph as CX to an open file.
.. code-block:: python
from pybel.examples import sialic_acid_graph
from pybel import to_cx_file
with open('graph.bel.cx.json', 'w') as file:
to_cx_file(sialic_acid_graph, file)
The example below shows how to output a BEL graph as CX to a file at a given path.
.. code-block:: python
from pybel.examples import sialic_acid_graph
from pybel import to_cx_file
to_cx_file(sialic_acid_graph, 'graph.bel.cx.json')
If you have a big graph, you might consider storing it as a gzipped JGIF file
by using :func:`to_cx_gz`.
"""
graph_cx_json_dict = to_cx(graph)
json.dump(graph_cx_json_dict, path, ensure_ascii=False, indent=indent, **kwargs)
[docs]def to_cx_gz(graph, path: str, **kwargs) -> None:
"""Write a graph as CX JSON to a gzip file."""
with gzip.open(path, "wt") as file:
json.dump(to_cx(graph), file, ensure_ascii=False, **kwargs)
[docs]def to_cx_jsons(graph: BELGraph, **kwargs) -> str:
"""Dump this graph as a CX JSON object to a string."""
return json.dumps(to_cx(graph), ensure_ascii=False, **kwargs)
def _iterate_list_of_dicts(list_of_dicts: List[Dict]):
"""Iterate over a list of dictionaries.
:type list_of_dicts: list[dict[A,B]]
:rtype: iter[tuple[A,B]]
"""
for dictionary in list_of_dicts:
for key, value in dictionary.items():
yield key, value
[docs]def from_cx(cx: List[Dict]) -> BELGraph: # noqa: C901
"""Rebuild a BELGraph from CX JSON output from PyBEL.
:param cx: The CX JSON object for this graph
"""
graph = BELGraph()
context_legend_aspect = []
annotation_lists_aspect = []
context_entry = {}
network_attributes_aspect = []
nodes_aspect = []
node_attributes_aspect = []
edge_annotations_aspect = []
edges_aspect = []
meta_entries = defaultdict(list)
for key, value in _iterate_list_of_dicts(cx):
if key == "context_legend":
context_legend_aspect.extend(value)
elif key == "annotation_lists":
annotation_lists_aspect.extend(value)
elif key == "@context":
for element in value:
context_entry.update(element)
elif key == "networkAttributes":
network_attributes_aspect.extend(value)
elif key == "nodes":
nodes_aspect.extend(value)
elif key == "nodeAttributes":
node_attributes_aspect.extend(value)
elif key == "edges":
edges_aspect.extend(value)
elif key == "edgeAttributes":
edge_annotations_aspect.extend(value)
else:
meta_entries[key].extend(value)
context_legend = _cx_to_dict(context_legend_aspect)
annotation_lists = defaultdict(set)
for data in annotation_lists_aspect:
annotation_lists[data["k"]].add(data["v"])
for keyword, entry in context_entry.items():
if context_legend[keyword] == GRAPH_NAMESPACE_URL:
graph.namespace_url[keyword] = entry
elif context_legend[keyword] == GRAPH_NAMESPACE_PATTERN:
graph.namespace_pattern[keyword] = entry
elif context_legend[keyword] == GRAPH_ANNOTATION_URL:
graph.annotation_url[keyword] = entry
elif context_legend[keyword] == GRAPH_ANNOTATION_PATTERN:
graph.annotation_pattern[keyword] = entry
elif context_legend[keyword] == GRAPH_ANNOTATION_LIST:
graph.annotation_list[keyword] = annotation_lists[entry]
for data in network_attributes_aspect:
if data["n"] == NDEX_SOURCE_FORMAT:
continue
graph.graph[GRAPH_METADATA][data["n"]] = data["v"]
node_name = {}
for data in nodes_aspect:
node_name[data["@id"]] = data["n"]
node_data = defaultdict(dict)
for data in node_attributes_aspect:
node_data[data["po"]][data["n"]] = data["v"]
# put all normal data here
node_data_pp = defaultdict(dict)
# Group all fusion-related data here
node_data_fusion = defaultdict(dict)
# Group all variant-related data
node_data_variants = defaultdict(lambda: defaultdict(dict))
for nid, data in node_data.items():
for key, value in data.items():
if key.startswith(FUSION):
node_data_fusion[nid][key] = value
elif key.startswith(VARIANTS):
_, i, vls = key.split("_", 2)
node_data_variants[nid][i][vls] = value
elif key in {PRODUCTS, REACTANTS, MEMBERS}:
node_data_pp[nid][key] = json.loads(value)
else:
node_data_pp[nid][key] = value
for nid, data in node_data_fusion.items():
data = expand_dict(data)
data[FUSION] = _restore_fusion_dict(data[FUSION])
node_data_pp[nid].update(data)
for nid, data in node_data_variants.items():
node_data_pp[nid][VARIANTS] = [expand_dict(value) for _, value in sorted(data.items())]
nid_node_tuple = {}
for nid, data in node_data_pp.items():
if CX_NODE_NAME in data:
data[NAME] = data.pop(CX_NODE_NAME)
nid_node_tuple[nid] = _node = parse_result_to_dsl(data)
graph.add_node_from_data(_node)
edge_relation = {}
eid_source_nid = {}
eid_target_nid = {}
for data in edges_aspect:
eid = data["@id"]
edge_relation[eid] = data["i"]
eid_source_nid[eid] = data["s"]
eid_target_nid[eid] = data["t"]
edge_data = defaultdict(dict) # type: Dict[str, Dict[str, str]]
for data in edge_annotations_aspect:
edge_data[data["po"]][data["n"]] = data["v"]
edge_citation = defaultdict(dict) # type: Dict[str, Dict[str, str]]
edge_subject = defaultdict(dict)
edge_object = defaultdict(dict)
edge_annotations = defaultdict(lambda: defaultdict(dict))
edge_data_pp = defaultdict(dict)
for eid, data in edge_data.items():
for key, value in data.items():
if key.startswith(CITATION):
vl = _after_underscore(key)
edge_citation[eid][vl] = value
elif key.startswith(NDEX_SOURCE_MODIFIER):
vl = _after_underscore(key)
edge_subject[eid][vl] = value
elif key.startswith(NDEX_TARGET_MODIFIER):
vl = _after_underscore(key)
edge_object[eid][vl] = value
elif key == EVIDENCE:
edge_data_pp[eid][EVIDENCE] = value
else:
edge_annotations[eid][key] = value
for eid, data in edge_citation.items():
edge_data_pp[eid][CITATION] = data
for eid, data in edge_subject.items():
edge_data_pp[eid][SOURCE_MODIFIER] = expand_dict(data)
for eid, data in edge_object.items():
edge_data_pp[eid][TARGET_MODIFIER] = expand_dict(data)
for eid in edge_relation:
if eid in edge_annotations: # FIXME stick this in edge_data.items() iteration
edge_data_pp[eid][ANNOTATIONS] = {
key: [Entity(**v) for v in values] for key, values in edge_annotations[eid].items()
}
if eid in edge_citation:
graph.add_qualified_edge(
nid_node_tuple[eid_source_nid[eid]],
nid_node_tuple[eid_target_nid[eid]],
relation=edge_relation[eid],
citation=edge_data_pp[eid][CITATION],
evidence=edge_data_pp[eid][EVIDENCE],
source_modifier=edge_data_pp[eid].get(SOURCE_MODIFIER),
target_modifier=edge_data_pp[eid].get(TARGET_MODIFIER),
annotations=edge_data_pp[eid].get(ANNOTATIONS),
)
elif edge_relation[eid] in UNQUALIFIED_EDGES:
graph.add_unqualified_edge(
nid_node_tuple[eid_source_nid[eid]],
nid_node_tuple[eid_target_nid[eid]],
edge_relation[eid],
)
else:
raise ValueError("problem adding edge: {}".format(eid))
return graph
def _after_underscore(key):
_, vl = key.split("_", 1)
return vl
[docs]@open_file(0, mode="r")
def from_cx_file(path: Union[str, TextIO]) -> BELGraph:
"""Read a file containing CX JSON and converts to a BEL graph.
:param path: A readable file or file-like containing the CX JSON for this graph
:return: A BEL Graph representing the CX graph contained in the file
"""
return from_cx(json.load(path))
[docs]def from_cx_gz(path: str) -> BELGraph:
"""Read a graph as CX JSON from a gzip file."""
with gzip.open(path, "rt") as file:
return from_cx(json.load(file))
[docs]def from_cx_jsons(graph_json_str: str) -> BELGraph:
"""Read a BEL graph from a CX JSON string."""
return from_cx(json.loads(graph_json_str))