# -*- coding: utf-8 -*-
"""Importer for Hetionet JSON."""
import bz2
import json
import logging
import os
from typing import Any, Mapping, Set, Tuple, Union
from urllib.request import urlretrieve
from tqdm import tqdm
from .constants import (
ACTIVATES_ACTIONS, BINDS_ACTIONS, COMPOUND, DSL_MAP, GENE, HETIONET_PUBMED, INHIBITS_ACTIONS, PHARMACOLOGICAL_CLASS,
QUALIFIED_MAPPING, REGULATES_ACTIONS, UNQUALIFIED_MAPPING,
)
from ...config import CACHE_DIRECTORY
from ...dsl import Abundance, Protein
from ...struct import BELGraph
__all__ = [
'get_hetionet',
'from_hetionet_json',
'from_hetionet_gz',
'from_hetionet_file',
]
logger = logging.getLogger(__name__)
JSON_BZ2_URL = 'https://github.com/hetio/hetionet/raw/master/hetnet/json/hetionet-v1.0.json.bz2'
PATH = os.path.join(CACHE_DIRECTORY, 'hetionet-v1.0.json.bz2')
[docs]def get_hetionet() -> BELGraph:
"""Get Hetionet from GitHub, cache, and convert to BEL."""
if not os.path.exists(PATH):
logger.warning('downloading hetionet from %s to %s', JSON_BZ2_URL, PATH)
urlretrieve(JSON_BZ2_URL, PATH) # noqa: S310
return from_hetionet_gz(PATH)
[docs]def from_hetionet_gz(path: str) -> BELGraph:
"""Get Hetionet from its JSON GZ file."""
logger.info('opening %s', path)
with bz2.open(path) as file:
return from_hetionet_file(file)
[docs]def from_hetionet_file(file) -> BELGraph:
"""Get Hetionet from a JSON file."""
logger.info('parsing json from %s', file)
j = json.load(file)
logger.info('converting hetionet dict to BEL')
return from_hetionet_json(j)
[docs]def from_hetionet_json(
hetionet_dict: Mapping[str, Any],
use_tqdm: bool = True,
) -> BELGraph:
"""Convert a Hetionet dictionary to a BEL graph."""
graph = BELGraph( # FIXME what metadata is appropriate?
name='Hetionet',
version='1.0',
authors='Daniel Himmelstein',
)
# FIXME add namespaces
# graph.namespace_pattern.update({})
kind_identifier_to_name = {
(x['kind'], x['identifier']): x['name']
for x in hetionet_dict['nodes']
}
edges = hetionet_dict['edges']
if use_tqdm:
edges = tqdm(edges, desc='Converting Hetionet', unit_scale=True)
it_logger = edges.write
else:
it_logger = logger.info
for edge in edges:
_add_edge(graph, edge, kind_identifier_to_name, it_logger)
return graph
def _get_node(edge, key, kind_identifier_to_name) -> Union[Tuple[None, None, None, None], Tuple[str, str, str, str]]:
node_type, node_identifier = edge[key]
namespace = DSL_MAP.get(node_type)
if namespace is None:
return None, None, None, None
node_name = kind_identifier_to_name[node_type, node_identifier]
node_identifier = str(node_identifier)
if node_identifier.lower().startswith(namespace):
node_identifier = node_identifier[1 + len(namespace):] # remove redundant prefix
return node_type, namespace, node_identifier, node_name
def _add_edge( # noqa: C901
graph,
edge,
kind_identifier_to_name,
it_logger,
) -> Union[None, str, Set[str]]:
source_type, source_ns, source_identifier, source_name = _get_node(edge, 'source_id', kind_identifier_to_name)
target_type, target_ns, target_identifier, target_name = _get_node(edge, 'target_id', kind_identifier_to_name)
if source_type is None or target_type is None:
return
kind = edge['kind']
# direction = e['direction']
data = edge['data']
if 'unbiased' in data:
del data['unbiased']
annotations = {}
if 'source' in data:
source = data.pop('source')
annotations['source'] = {source: True}
elif 'sources' in data:
annotations['source'] = {
source: True
for source in data.pop('sources')
}
else:
pass
# it_logger(f'Missing source for {source_identifier}-{kind}-{target_identifier}\n{e}')
if 'pubmed_ids' in data:
citations = list(data.pop('pubmed_ids'))
else:
citations = [HETIONET_PUBMED]
for k, v in data.items():
if k in {'actions', 'urls', 'subtypes'}:
continue # handled explicitly later
if not isinstance(v, (str, int, bool, float)):
it_logger('Unhandled: {source_identifier}-{kind}-{target_identifier} {k}: {v}'.format(
source_identifier=source_identifier, kind=kind, target_identifier=target_identifier,
k=k, v=v,
))
continue
annotations[k] = {v: True}
for _h_type, h_dsl, _r, _t_type, t_dsl, f in QUALIFIED_MAPPING:
if source_type != _h_type or kind != _r or target_type != _t_type:
continue
rv = set()
for citation in citations:
key = f(
graph,
h_dsl(namespace=source_ns, identifier=source_identifier, name=source_name),
t_dsl(namespace=target_ns, identifier=target_identifier, name=target_name),
citation=citation, evidence='', annotations=annotations,
)
rv.add(key)
return rv
for _h_type, h_dsl, _r, _t_type, t_dsl, f in UNQUALIFIED_MAPPING:
if source_type == _h_type and kind == _r and target_type == _t_type:
return f(
graph,
h_dsl(namespace=source_ns, identifier=source_identifier, name=source_name),
t_dsl(namespace=target_ns, identifier=target_identifier, name=target_name),
)
def _check(_source_type: str, _kind: str, _target_type: str) -> bool:
"""Check the metaedge."""
return kind == _kind and source_type == _source_type and target_type == _target_type
if _check(COMPOUND, 'binds', GENE):
drug = Abundance(namespace='drugbank', name=source_name, identifier=source_identifier)
protein = Protein(namespace='ncbigene', name=target_name, identifier=target_identifier)
rv = set()
for action in data.get('actions', []):
action = action.lower()
if action in ACTIVATES_ACTIONS:
key = graph.add_directly_activates(
drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations,
)
elif action in INHIBITS_ACTIONS:
key = graph.add_directly_inhibits(
drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations,
)
elif action in REGULATES_ACTIONS:
key = graph.add_regulates(drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations)
elif action in BINDS_ACTIONS:
key = graph.add_binds(drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations)
else:
key = graph.add_binds(drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations)
it_logger('Unhandled action for {source_identifier}-{kind}-{target_identifier}: {action}'.format(
source_identifier=source_identifier, kind=kind, target_identifier=target_identifier, action=action,
))
rv.add(key)
return rv
if _check(PHARMACOLOGICAL_CLASS, 'includes', COMPOUND):
return graph.add_is_a(
Abundance(namespace='drugbank', name=target_name, identifier=target_identifier),
Abundance(namespace='drugcentral', name=source_name, identifier=source_identifier),
)
it_logger('missed: {edge}'.format(edge=edge))