# -*- coding: utf-8 -*-
"""Conversion functions for BEL graphs with node-link JSON."""
import gzip
import json
from io import BytesIO
from itertools import chain, count
from operator import methodcaller
from typing import Any, Mapping, TextIO, Union
from networkx.utils import open_file
from .utils import ensure_version
from ..constants import (
ANNOTATIONS,
CITATION,
FUSION,
GRAPH_ANNOTATION_CURIE,
GRAPH_ANNOTATION_LIST,
GRAPH_ANNOTATION_MIRIAM,
MEMBERS,
PARTNER_3P,
PARTNER_5P,
PRODUCTS,
REACTANTS,
SOURCE_MODIFIER,
TARGET_MODIFIER,
)
from ..dsl import BaseEntity
from ..language import citation_dict
from ..struct import BELGraph
from ..struct.graph import _handle_modifier
from ..tokens import parse_result_to_dsl
from ..utils import hash_edge, tokenize_version
__all__ = [
"to_nodelink",
"to_nodelink_file",
"to_nodelink_gz",
"to_nodelink_jsons",
"from_nodelink",
"from_nodelink_file",
"from_nodelink_gz",
"from_nodelink_jsons",
"to_nodelink_gz_io",
"from_nodelink_gz_io",
]
[docs]def to_nodelink(graph: BELGraph) -> Mapping[str, Any]:
"""Convert this graph to a node-link JSON object.
:param graph: BEL Graph
"""
graph_json_dict = _to_nodelink_json_helper(graph)
_prepare_graph_dict(graph_json_dict["graph"])
return graph_json_dict
def _prepare_graph_dict(g):
# Convert annotation list definitions (which are sets) to canonicalized/sorted lists
g[GRAPH_ANNOTATION_LIST] = {
keyword: list(sorted(values)) for keyword, values in g.get(GRAPH_ANNOTATION_LIST, {}).items()
}
g[GRAPH_ANNOTATION_CURIE] = list(sorted(g[GRAPH_ANNOTATION_CURIE]))
g[GRAPH_ANNOTATION_MIRIAM] = list(sorted(g[GRAPH_ANNOTATION_MIRIAM]))
[docs]@open_file(1, mode="w")
def to_nodelink_file(graph: BELGraph, path: Union[str, TextIO], **kwargs) -> None:
"""Write this graph as node-link JSON to a file.
:param graph: A BEL graph
:param path: A path or file-like
"""
graph_json_dict = to_nodelink(graph)
json.dump(graph_json_dict, path, ensure_ascii=False, **kwargs)
[docs]def to_nodelink_gz(graph, path: str, **kwargs) -> None:
"""Write a graph as node-link JSON to a gzip file."""
with gzip.open(path, "wt") as file:
json.dump(to_nodelink(graph), file, ensure_ascii=False, **kwargs)
[docs]def to_nodelink_jsons(graph: BELGraph, **kwargs) -> str:
"""Dump this graph as a node-link JSON object to a string."""
return json.dumps(to_nodelink(graph), ensure_ascii=False, **kwargs)
[docs]def from_nodelink(graph_json_dict: Mapping[str, Any], check_version: bool = True) -> BELGraph:
"""Build a graph from node-link JSON Object."""
pybel_version = tokenize_version(graph_json_dict["graph"]["pybel_version"])
if pybel_version[1] < 14: # if minor version is less than 14
raise ValueError("Invalid NodeLink JSON from old version of PyBEL (v{}.{}.{})".format(*pybel_version))
graph = _from_nodelink_json_helper(graph_json_dict)
return ensure_version(graph, check_version=check_version)
[docs]@open_file(0, mode="r")
def from_nodelink_file(path: Union[str, TextIO], check_version: bool = True) -> BELGraph:
"""Build a graph from the node-link JSON contained in the given file.
:param path: A path or file-like
"""
return from_nodelink(json.load(path), check_version=check_version)
[docs]def from_nodelink_gz(path: str) -> BELGraph:
"""Read a graph as node-link JSON from a gzip file."""
with gzip.open(path, "rt") as file:
return from_nodelink(json.load(file))
[docs]def from_nodelink_jsons(graph_json_str: str, check_version: bool = True) -> BELGraph:
"""Read a BEL graph from a node-link JSON string."""
return from_nodelink(json.loads(graph_json_str), check_version=check_version)
def _to_nodelink_json_helper(graph: BELGraph) -> Mapping[str, Any]:
"""Convert a BEL graph to a node-link format.
:param graph: BEL Graph
Adapted from :func:`networkx.readwrite.json_graph.node_link_data`
"""
nodes = sorted(graph, key=methodcaller("as_bel"))
mapping = dict(zip(nodes, count()))
return {
"directed": True,
"multigraph": True,
"graph": graph.graph.copy(),
"nodes": [_augment_node(node) for node in nodes],
"links": [
dict(
chain(
data.copy().items(),
[("source", mapping[u]), ("target", mapping[v]), ("key", key)],
),
)
for u, v, key, data in graph.edges(keys=True, data=True)
],
}
def _augment_node(node: BaseEntity) -> BaseEntity:
"""Add the SHA-512 identifier to a node's dictionary."""
rv = node.copy()
rv["id"] = node.md5
rv["bel"] = node.as_bel()
for m in chain(node.get(MEMBERS, []), node.get(REACTANTS, []), node.get(PRODUCTS, [])):
m.update(_augment_node(m))
if FUSION in node:
node[FUSION][PARTNER_3P].update(_augment_node(node[FUSION][PARTNER_3P]))
node[FUSION][PARTNER_5P].update(_augment_node(node[FUSION][PARTNER_5P]))
return rv
def _recover_graph_dict(graph: BELGraph):
graph.graph[GRAPH_ANNOTATION_LIST] = {
keyword: set(values) for keyword, values in graph.graph.get(GRAPH_ANNOTATION_LIST, {}).items()
}
graph.graph[GRAPH_ANNOTATION_CURIE] = set(graph.graph.get(GRAPH_ANNOTATION_CURIE, []))
graph.graph[GRAPH_ANNOTATION_MIRIAM] = set(graph.graph.get(GRAPH_ANNOTATION_MIRIAM, []))
def _from_nodelink_json_helper(data: Mapping[str, Any]) -> BELGraph:
"""Return graph from node-link data format.
Adapted from :func:`networkx.readwrite.json_graph.node_link_graph`
"""
graph = BELGraph()
graph.graph = data.get("graph", {})
_recover_graph_dict(graph)
mapping = []
for node_data in data["nodes"]:
node = parse_result_to_dsl(node_data)
graph.add_node_from_data(node)
mapping.append(node)
for data in data["links"]:
u = mapping[data["source"]]
v = mapping[data["target"]]
edge_data = {k: v for k, v in data.items() if k not in {"source", "target", "key"}}
for side in (SOURCE_MODIFIER, TARGET_MODIFIER):
side_data = edge_data.get(side)
if side_data:
_handle_modifier(side_data)
if CITATION in edge_data:
edge_data[CITATION] = citation_dict(**edge_data[CITATION])
if ANNOTATIONS in edge_data:
edge_data[ANNOTATIONS] = graph._clean_annotations(edge_data[ANNOTATIONS])
graph.add_edge(u, v, key=hash_edge(u, v, edge_data), **edge_data)
return graph
def to_nodelink_gz_io(graph: BELGraph) -> BytesIO:
"""Get a BEL graph as a compressed BytesIO."""
bytes_io = BytesIO()
with gzip.GzipFile(fileobj=bytes_io, mode="w") as file:
s = to_nodelink_jsons(graph)
file.write(s.encode("utf-8"))
bytes_io.seek(0)
return bytes_io
def from_nodelink_gz_io(bytes_io: BytesIO) -> BELGraph:
"""Get BEL from gzipped nodelink JSON."""
with gzip.GzipFile(fileobj=bytes_io, mode="r") as file:
s = file.read()
j = s.decode("utf-8")
return from_nodelink_jsons(j)