Source code for

# -*- coding: utf-8 -*-

"""This module wraps conversion between :class:`pybel.BELGraph` and the Cyberinfrastructure Exchange (CX) JSON.

CX is an aspect-oriented network interchange format encoded in JSON with a format inspired by the JSON-LD encoding of
Resource Description Framework (RDF). It is primarily used by the Network Data Exchange (NDEx) and more recent versions
of Cytoscape.

.. seealso::

    - The NDEx Data Model `Specification <>`_
    - `Cytoscape.js <>`_
    - CX Support for Cytoscape.js on the Cytoscape `App Store <>`_

import gzip
import json
import logging
import time
from collections import defaultdict
from operator import methodcaller
from typing import Dict, List, Mapping, Optional, TextIO, Union

from networkx.utils import open_file

from ..canonicalize import calculate_canonical_name
from ..constants import (
from ..dsl import BaseAbundance, BaseEntity
from ..language import Entity
from ..struct import BELGraph
from ..tokens import parse_result_to_dsl
from ..utils import expand_dict, flatten_dict

__all__ = [

log = logging.getLogger(__name__)

CX_NODE_NAME = 'label'
NDEX_SOURCE_FORMAT = "ndex:sourceFormat"

NDEX_SOURCE_MODIFIER = 'sourceModifier'
NDEX_TARGET_MODIFIER = 'targetModifier'

def _cx_to_dict(list_of_dicts: List[Dict], key_tag: str = 'k', value_tag: str = 'v') -> Dict:
    """Convert a CX list of dictionaries to a flat dictionary."""
    return {
        d[key_tag]: d[value_tag]
        for d in list_of_dicts

def _cleanse_fusion_dict(d: Dict) -> Dict:
    """Fix the fusion partner names."""
    return {
        k.replace('_', ''): v
        for k, v in d.items()

_p_dict = {
    'partner5p': PARTNER_5P,
    'partner3p': PARTNER_3P,
    'range5p': RANGE_5P,
    'range3p': RANGE_3P,

def _restore_fusion_dict(d: Dict) -> Dict:
    return {
        _p_dict[k]: v
        for k, v in d.items()

def build_node_mapping(graph: BELGraph) -> Mapping[BaseEntity, int]:
    """Build a mapping from a graph's nodes to their canonical sort order."""
    return {
        node: node_index
        for node_index, node in enumerate(sorted(graph, key=methodcaller('as_bel')))

[docs]def to_cx(graph: BELGraph) -> List[Dict]: # noqa: C901 """Convert a BEL Graph to a CX JSON object for use with `NDEx <>`_. .. seealso:: - `NDEx Python Client <>`_ """ node_mapping = build_node_mapping(graph) node_index_data = {} nodes_entry = [] node_attributes_entry = [] for node, node_index in node_mapping.items(): node_index_data[node_index] = node node_entry_dict = { '@id': node_index, 'n': calculate_canonical_name(node), } if isinstance(node, BaseAbundance): node_entry_dict['r'] = node.curie nodes_entry.append(node_entry_dict) aliases = [] if isinstance(node, BaseAbundance): aliases.extend(xref.curie for xref in node.xrefs) if aliases: node_attributes_entry.append({ 'po': node_index, 'n': 'alias', 'v': aliases, 'd': 'list_of_str', }) for k, v in node.items(): if k == VARIANTS: for i, el in enumerate(v): for a, b in flatten_dict(el).items(): node_attributes_entry.append({ 'po': node_index, 'n': '{}_{}_{}'.format(k, i, a), 'v': b, }) elif k == FUSION: v = _cleanse_fusion_dict(v) for a, b in flatten_dict(v).items(): node_attributes_entry.append({ 'po': node_index, 'n': '{}_{}'.format(k, a), 'v': b, }) elif k == NAME: node_attributes_entry.append({ 'po': node_index, 'n': CX_NODE_NAME, 'v': v, }) elif k in {PRODUCTS, REACTANTS, MEMBERS}: node_attributes_entry.append({ 'po': node_index, 'n': k, 'v': json.dumps(v), }) else: node_attributes_entry.append({ 'po': node_index, 'n': k, 'v': v, }) edges_entry = [] edge_attributes_entry = [] for edge_index, (source, target, d) in enumerate(graph.edges(data=True)): uid = node_mapping[source] vid = node_mapping[target] edges_entry.append({ '@id': edge_index, 's': uid, 't': vid, 'i': d[RELATION], }) if EVIDENCE in d: edge_attributes_entry.append({ 'po': edge_index, 'n': EVIDENCE, 'v': d[EVIDENCE], }) for k, v in d[CITATION].items(): edge_attributes_entry.append({ 'po': edge_index, 'n': '{}_{}'.format(CITATION, k), 'v': v, }) if ANNOTATIONS in d: for annotation, values in d[ANNOTATIONS].items(): edge_attributes_entry.append({ 'po': edge_index, 'n': annotation, 'v': sorted(values, key=lambda e: (e.namespace, e.identifier,, 'd': 'list_of_string', }) if SOURCE_MODIFIER in d: for k, v in flatten_dict(d[SOURCE_MODIFIER]).items(): edge_attributes_entry.append({ 'po': edge_index, 'n': '{}_{}'.format(NDEX_SOURCE_MODIFIER, k), 'v': v, }) if TARGET_MODIFIER in d: for k, v in flatten_dict(d[TARGET_MODIFIER]).items(): edge_attributes_entry.append({ 'po': edge_index, 'n': '{}_{}'.format(NDEX_TARGET_MODIFIER, k), 'v': v, }) context_legend = {} for key in graph.namespace_url: context_legend[key] = GRAPH_NAMESPACE_URL for key in graph.namespace_pattern: context_legend[key] = GRAPH_NAMESPACE_PATTERN for key in graph.annotation_url: context_legend[key] = GRAPH_ANNOTATION_URL for key in graph.annotation_pattern: context_legend[key] = GRAPH_ANNOTATION_PATTERN for key in graph.annotation_list: context_legend[key] = GRAPH_ANNOTATION_LIST context_legend_entry = [] for keyword, resource_type in context_legend.items(): context_legend_entry.append({ 'k': keyword, 'v': resource_type, }) annotation_list_keys_lookup = {keyword: i for i, keyword in enumerate(sorted(graph.annotation_list))} annotation_lists_entry = [] for keyword, values in graph.annotation_list.items(): for v in values: annotation_lists_entry.append({ 'k': annotation_list_keys_lookup[keyword], 'v': v, }) context_entry_dict = {} context_entry_dict.update(graph.namespace_url) context_entry_dict.update(graph.namespace_pattern) context_entry_dict.update(graph.annotation_url) context_entry_dict.update(graph.annotation_pattern) context_entry_dict.update(annotation_list_keys_lookup) context_entry_dict.update(graph.namespace_url) context_entry = [context_entry_dict] network_attributes_entry = [{ "n": NDEX_SOURCE_FORMAT, "v": "PyBEL", }] for k, v in graph.document.items(): network_attributes_entry.append({ 'n': k, 'v': v, }) # Coalesce to cx # cx = create_aspect.number_verification() cx = [{'numberVerification': [{'longNumber': 281474976710655}]}] cx_pairs = [ ('@context', context_entry), ('context_legend', context_legend_entry), ('annotation_lists', annotation_lists_entry), ('networkAttributes', network_attributes_entry), ('nodes', nodes_entry), ('nodeAttributes', node_attributes_entry), ('edges', edges_entry), ('edgeAttributes', edge_attributes_entry), ] cx_metadata = [] for key, aspect in cx_pairs: aspect_dict = { "name": key, "elementCount": len(aspect), "lastUpdate": time.time(), "consistencyGroup": 1, "properties": [], "version": "1.0", } if key in {'citations', 'supports', 'nodes', 'edges'}: aspect_dict['idCounter'] = len(aspect) cx_metadata.append(aspect_dict) cx.append({ 'metaData': cx_metadata, }) for key, aspect in cx_pairs: cx.append({ key: aspect, }) cx.append({"status": [{"error": "", "success": True}]}) return cx
[docs]@open_file(1, mode='w') def to_cx_file(graph: BELGraph, path: Union[str, TextIO], indent: Optional[int] = 2, **kwargs) -> None: """Write a BEL graph to a JSON file in CX format. :param graph: A BEL graph :param path: A writable file or file-like :param indent: How many spaces to use to pretty print. Change to None for no pretty printing The example below shows how to output a BEL graph as CX to an open file. .. code-block:: python from pybel.examples import sialic_acid_graph from pybel import to_cx_file with open('', 'w') as file: to_cx_file(sialic_acid_graph, file) The example below shows how to output a BEL graph as CX to a file at a given path. .. code-block:: python from pybel.examples import sialic_acid_graph from pybel import to_cx_file to_cx_file(sialic_acid_graph, '') If you have a big graph, you might consider storing it as a gzipped JGIF file by using :func:`to_cx_gz`. """ graph_cx_json_dict = to_cx(graph) json.dump(graph_cx_json_dict, path, ensure_ascii=False, indent=indent, **kwargs)
[docs]def to_cx_gz(graph, path: str, **kwargs) -> None: """Write a graph as CX JSON to a gzip file.""" with, 'wt') as file: json.dump(to_cx(graph), file, ensure_ascii=False, **kwargs)
[docs]def to_cx_jsons(graph: BELGraph, **kwargs) -> str: """Dump this graph as a CX JSON object to a string.""" return json.dumps(to_cx(graph), ensure_ascii=False, **kwargs)
def _iterate_list_of_dicts(list_of_dicts: List[Dict]): """Iterate over a list of dictionaries. :type list_of_dicts: list[dict[A,B]] :rtype: iter[tuple[A,B]] """ for dictionary in list_of_dicts: for key, value in dictionary.items(): yield key, value
[docs]def from_cx(cx: List[Dict]) -> BELGraph: # noqa: C901 """Rebuild a BELGraph from CX JSON output from PyBEL. :param cx: The CX JSON object for this graph """ graph = BELGraph() context_legend_aspect = [] annotation_lists_aspect = [] context_entry = {} network_attributes_aspect = [] nodes_aspect = [] node_attributes_aspect = [] edge_annotations_aspect = [] edges_aspect = [] meta_entries = defaultdict(list) for key, value in _iterate_list_of_dicts(cx): if key == 'context_legend': context_legend_aspect.extend(value) elif key == 'annotation_lists': annotation_lists_aspect.extend(value) elif key == '@context': for element in value: context_entry.update(element) elif key == 'networkAttributes': network_attributes_aspect.extend(value) elif key == 'nodes': nodes_aspect.extend(value) elif key == 'nodeAttributes': node_attributes_aspect.extend(value) elif key == 'edges': edges_aspect.extend(value) elif key == 'edgeAttributes': edge_annotations_aspect.extend(value) else: meta_entries[key].extend(value) context_legend = _cx_to_dict(context_legend_aspect) annotation_lists = defaultdict(set) for data in annotation_lists_aspect: annotation_lists[data['k']].add(data['v']) for keyword, entry in context_entry.items(): if context_legend[keyword] == GRAPH_NAMESPACE_URL: graph.namespace_url[keyword] = entry elif context_legend[keyword] == GRAPH_NAMESPACE_PATTERN: graph.namespace_pattern[keyword] = entry elif context_legend[keyword] == GRAPH_ANNOTATION_URL: graph.annotation_url[keyword] = entry elif context_legend[keyword] == GRAPH_ANNOTATION_PATTERN: graph.annotation_pattern[keyword] = entry elif context_legend[keyword] == GRAPH_ANNOTATION_LIST: graph.annotation_list[keyword] = annotation_lists[entry] for data in network_attributes_aspect: if data['n'] == NDEX_SOURCE_FORMAT: continue graph.graph[GRAPH_METADATA][data['n']] = data['v'] node_name = {} for data in nodes_aspect: node_name[data['@id']] = data['n'] node_data = defaultdict(dict) for data in node_attributes_aspect: node_data[data['po']][data['n']] = data['v'] # put all normal data here node_data_pp = defaultdict(dict) # Group all fusion-related data here node_data_fusion = defaultdict(dict) # Group all variant-related data node_data_variants = defaultdict(lambda: defaultdict(dict)) for nid, data in node_data.items(): for key, value in data.items(): if key.startswith(FUSION): node_data_fusion[nid][key] = value elif key.startswith(VARIANTS): _, i, vls = key.split('_', 2) node_data_variants[nid][i][vls] = value elif key in {PRODUCTS, REACTANTS, MEMBERS}: node_data_pp[nid][key] = json.loads(value) else: node_data_pp[nid][key] = value for nid, data in node_data_fusion.items(): data = expand_dict(data) data[FUSION] = _restore_fusion_dict(data[FUSION]) node_data_pp[nid].update(data) for nid, data in node_data_variants.items(): node_data_pp[nid][VARIANTS] = [ expand_dict(value) for _, value in sorted(data.items()) ] nid_node_tuple = {} for nid, data in node_data_pp.items(): if CX_NODE_NAME in data: data[NAME] = data.pop(CX_NODE_NAME) nid_node_tuple[nid] = _node = parse_result_to_dsl(data) graph.add_node_from_data(_node) edge_relation = {} eid_source_nid = {} eid_target_nid = {} for data in edges_aspect: eid = data['@id'] edge_relation[eid] = data['i'] eid_source_nid[eid] = data['s'] eid_target_nid[eid] = data['t'] edge_data = defaultdict(dict) # type: Dict[str, Dict[str, str]] for data in edge_annotations_aspect: edge_data[data['po']][data['n']] = data['v'] edge_citation = defaultdict(dict) # type: Dict[str, Dict[str, str]] edge_subject = defaultdict(dict) edge_object = defaultdict(dict) edge_annotations = defaultdict(lambda: defaultdict(dict)) edge_data_pp = defaultdict(dict) for eid, data in edge_data.items(): for key, value in data.items(): if key.startswith(CITATION): vl = _after_underscore(key) edge_citation[eid][vl] = value elif key.startswith(NDEX_SOURCE_MODIFIER): vl = _after_underscore(key) edge_subject[eid][vl] = value elif key.startswith(NDEX_TARGET_MODIFIER): vl = _after_underscore(key) edge_object[eid][vl] = value elif key == EVIDENCE: edge_data_pp[eid][EVIDENCE] = value else: edge_annotations[eid][key] = value for eid, data in edge_citation.items(): edge_data_pp[eid][CITATION] = data for eid, data in edge_subject.items(): edge_data_pp[eid][SOURCE_MODIFIER] = expand_dict(data) for eid, data in edge_object.items(): edge_data_pp[eid][TARGET_MODIFIER] = expand_dict(data) for eid in edge_relation: if eid in edge_annotations: # FIXME stick this in edge_data.items() iteration edge_data_pp[eid][ANNOTATIONS] = { key: [Entity(**v) for v in values] for key, values in edge_annotations[eid].items() } if eid in edge_citation: graph.add_qualified_edge( nid_node_tuple[eid_source_nid[eid]], nid_node_tuple[eid_target_nid[eid]], relation=edge_relation[eid], citation=edge_data_pp[eid][CITATION], evidence=edge_data_pp[eid][EVIDENCE], source_modifier=edge_data_pp[eid].get(SOURCE_MODIFIER), target_modifier=edge_data_pp[eid].get(TARGET_MODIFIER), annotations=edge_data_pp[eid].get(ANNOTATIONS), ) elif edge_relation[eid] in UNQUALIFIED_EDGES: graph.add_unqualified_edge( nid_node_tuple[eid_source_nid[eid]], nid_node_tuple[eid_target_nid[eid]], edge_relation[eid], ) else: raise ValueError('problem adding edge: {}'.format(eid)) return graph
def _after_underscore(key): _, vl = key.split('_', 1) return vl
[docs]@open_file(0, mode='r') def from_cx_file(path: Union[str, TextIO]) -> BELGraph: """Read a file containing CX JSON and converts to a BEL graph. :param path: A readable file or file-like containing the CX JSON for this graph :return: A BEL Graph representing the CX graph contained in the file """ return from_cx(json.load(path))
[docs]def from_cx_gz(path: str) -> BELGraph: """Read a graph as CX JSON from a gzip file.""" with, 'rt') as file: return from_cx(json.load(file))
[docs]def from_cx_jsons(graph_json_str: str) -> BELGraph: """Read a BEL graph from a CX JSON string.""" return from_cx(json.loads(graph_json_str))