Source code for pybel.io.hetionet.hetionet

# -*- coding: utf-8 -*-

"""Importer for Hetionet JSON."""

import bz2
import json
import logging
from typing import Any, Mapping, Set, Tuple, Union

import pystow
from tqdm import tqdm

from .constants import (
    ACTIVATES_ACTIONS, BINDS_ACTIONS, COMPOUND, DSL_MAP, GENE, HETIONET_PUBMED, INHIBITS_ACTIONS, PHARMACOLOGICAL_CLASS,
    QUALIFIED_MAPPING, REGULATES_ACTIONS, UNQUALIFIED_MAPPING,
)
from ...dsl import Abundance, Protein
from ...struct import BELGraph

__all__ = [
    'get_hetionet',
    'from_hetionet_json',
    'from_hetionet_gz',
    'from_hetionet_file',
]

logger = logging.getLogger(__name__)

JSON_BZ2_URL = 'https://github.com/hetio/hetionet/raw/master/hetnet/json/hetionet-v1.0.json.bz2'


[docs]def get_hetionet() -> BELGraph: """Get Hetionet from GitHub, cache, and convert to BEL.""" path = pystow.ensure('bio2bel', 'hetionet', url=JSON_BZ2_URL) return from_hetionet_gz(path.as_posix())
[docs]def from_hetionet_gz(path: str) -> BELGraph: """Get Hetionet from its JSON GZ file.""" logger.info('opening %s', path) with bz2.open(path) as file: return from_hetionet_file(file)
[docs]def from_hetionet_file(file) -> BELGraph: """Get Hetionet from a JSON file.""" logger.info('parsing json from %s', file) j = json.load(file) logger.info('converting hetionet dict to BEL') return from_hetionet_json(j)
[docs]def from_hetionet_json( hetionet_dict: Mapping[str, Any], use_tqdm: bool = True, ) -> BELGraph: """Convert a Hetionet dictionary to a BEL graph.""" graph = BELGraph( # FIXME what metadata is appropriate? name='Hetionet', version='1.0', authors='Daniel Himmelstein', ) # FIXME add namespaces # graph.namespace_pattern.update({}) kind_identifier_to_name = { (x['kind'], x['identifier']): x['name'] for x in hetionet_dict['nodes'] } edges = hetionet_dict['edges'] if use_tqdm: edges = tqdm(edges, desc='Converting Hetionet', unit_scale=True) it_logger = edges.write else: it_logger = logger.info for edge in edges: _add_edge(graph, edge, kind_identifier_to_name, it_logger) return graph
def _get_node(edge, key, kind_identifier_to_name) -> Union[Tuple[None, None, None, None], Tuple[str, str, str, str]]: node_type, node_identifier = edge[key] namespace = DSL_MAP.get(node_type) if namespace is None: return None, None, None, None node_name = kind_identifier_to_name[node_type, node_identifier] node_identifier = str(node_identifier) if node_identifier.lower().startswith(namespace): node_identifier = node_identifier[1 + len(namespace):] # remove redundant prefix return node_type, namespace, node_identifier, node_name def _add_edge( # noqa: C901 graph, edge, kind_identifier_to_name, it_logger, ) -> Union[None, str, Set[str]]: source_type, source_ns, source_identifier, source_name = _get_node(edge, 'source_id', kind_identifier_to_name) target_type, target_ns, target_identifier, target_name = _get_node(edge, 'target_id', kind_identifier_to_name) if source_type is None or target_type is None: return kind = edge['kind'] # direction = e['direction'] data = edge['data'] if 'unbiased' in data: del data['unbiased'] annotations = {} if 'source' in data: source = data.pop('source') annotations['source'] = {source: True} elif 'sources' in data: annotations['source'] = { source: True for source in data.pop('sources') } else: pass # it_logger(f'Missing source for {source_identifier}-{kind}-{target_identifier}\n{e}') if 'pubmed_ids' in data: citations = list(data.pop('pubmed_ids')) else: citations = [HETIONET_PUBMED] for k, v in data.items(): if k in {'actions', 'urls', 'subtypes'}: continue # handled explicitly later if not isinstance(v, (str, int, bool, float)): it_logger('Unhandled: {source_identifier}-{kind}-{target_identifier} {k}: {v}'.format( source_identifier=source_identifier, kind=kind, target_identifier=target_identifier, k=k, v=v, )) continue annotations[k] = {v: True} for _h_type, h_dsl, _r, _t_type, t_dsl, f in QUALIFIED_MAPPING: if source_type != _h_type or kind != _r or target_type != _t_type: continue rv = set() for citation in citations: key = f( graph, h_dsl(namespace=source_ns, identifier=source_identifier, name=source_name), t_dsl(namespace=target_ns, identifier=target_identifier, name=target_name), citation=citation, evidence='', annotations=annotations, ) rv.add(key) return rv for _h_type, h_dsl, _r, _t_type, t_dsl, f in UNQUALIFIED_MAPPING: if source_type == _h_type and kind == _r and target_type == _t_type: return f( graph, h_dsl(namespace=source_ns, identifier=source_identifier, name=source_name), t_dsl(namespace=target_ns, identifier=target_identifier, name=target_name), ) def _check(_source_type: str, _kind: str, _target_type: str) -> bool: """Check the metaedge.""" return kind == _kind and source_type == _source_type and target_type == _target_type if _check(COMPOUND, 'binds', GENE): drug = Abundance(namespace='drugbank', name=source_name, identifier=source_identifier) protein = Protein(namespace='ncbigene', name=target_name, identifier=target_identifier) rv = set() for action in data.get('actions', []): action = action.lower() if action in ACTIVATES_ACTIONS: key = graph.add_directly_activates( drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations, ) elif action in INHIBITS_ACTIONS: key = graph.add_directly_inhibits( drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations, ) elif action in REGULATES_ACTIONS: key = graph.add_regulates(drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations) elif action in BINDS_ACTIONS: key = graph.add_binds(drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations) else: key = graph.add_binds(drug, protein, citation=HETIONET_PUBMED, evidence='', annotations=annotations) it_logger('Unhandled action for {source_identifier}-{kind}-{target_identifier}: {action}'.format( source_identifier=source_identifier, kind=kind, target_identifier=target_identifier, action=action, )) rv.add(key) return rv if _check(PHARMACOLOGICAL_CLASS, 'includes', COMPOUND): return graph.add_is_a( Abundance(namespace='drugbank', name=target_name, identifier=target_identifier), Abundance(namespace='drugcentral', name=source_name, identifier=source_identifier), ) it_logger('missed: {edge}'.format(edge=edge))