Source code for pybel.parser.parse_bel

# -*- coding: utf-8 -*-

"""A parser for BEL.

This module handles parsing BEL relations and validation of semantics.
"""

import itertools as itt
import logging
from functools import lru_cache
from typing import Any, Dict, List, Mapping, Optional, Pattern, Set, Union

import pyparsing
from pyparsing import (
    Group,
    Keyword,
    MatchFirst,
    ParseResults,
    StringEnd,
    Suppress,
    delimitedList,
    oneOf,
    replaceWith,
)

from .baseparser import BaseParser
from .constants import NamespaceTermEncodingMapping
from .modifiers import (
    get_fragment_language,
    get_fusion_language,
    get_gene_modification_language,
    get_gene_substitution_language,
    get_hgvs_language,
    get_legacy_fusion_langauge,
    get_location_language,
    get_protein_modification_language,
    get_protein_substitution_language,
    get_truncation_language,
)
from .parse_concept import ConceptParser
from .parse_control import ControlParser
from .utils import WCW, nest, one_of_tags, triple
from .. import language
from ..constants import (
    ABUNDANCE,
    ACTIVITY,
    ASSOCIATION,
    BINDS,
    BIOPROCESS,
    CAUSES_NO_CHANGE,
    CELL_SECRETION,
    CELL_SURFACE_EXPRESSION,
    COMPLEX,
    COMPOSITE,
    CONCEPT,
    CORRELATION,
    DECREASES,
    DEGRADATION,
    DIRECTLY_DECREASES,
    DIRECTLY_INCREASES,
    DIRTY,
    EFFECT,
    EQUIVALENT_TO,
    FROM_LOC,
    FUNCTION,
    FUSION,
    GENE,
    IDENTIFIER,
    INCREASES,
    IS_A,
    LINE,
    LOCATION,
    MEMBERS,
    MIRNA,
    MODIFIER,
    NAME,
    NAMESPACE,
    NEGATIVE_CORRELATION,
    NO_CORRELATION,
    PART_OF,
    PATHOLOGY,
    POPULATION,
    POSITIVE_CORRELATION,
    PRODUCTS,
    PROTEIN,
    REACTANTS,
    REACTION,
    REGULATES,
    RELATION,
    RNA,
    SOURCE,
    TARGET,
    TO_LOC,
    TRANSCRIBED_TO,
    TRANSLATED_TO,
    TRANSLOCATION,
    TWO_WAY_RELATIONS,
    VARIANTS,
    belns_encodings,
)
from ..dsl import BaseEntity
from ..exceptions import (
    InvalidEntity,
    InvalidFunctionSemantic,
    MalformedTranslocationWarning,
    MissingAnnotationWarning,
    MissingCitationException,
    MissingSupportWarning,
    NestedRelationWarning,
)
from ..struct.graph import BELGraph
from ..tokens import parse_result_to_dsl

__all__ = [
    "BELParser",
    "modifier_po_to_dict",
    "parse",
]

logger = logging.getLogger("pybel.parser")

###########################
# 2.1 Abundance Functions #
###########################

#: 2.1.1 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#Xabundancea>
general_abundance_tags = one_of_tags(["a", "abundance"], ABUNDANCE, FUNCTION)

#: 2.1.2 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XcomplexA
complex_tag = one_of_tags(["complex", "complexAbundance"], COMPLEX, FUNCTION)

#: 2.1.3 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XcompositeA
composite_abundance_tag = one_of_tags(["composite", "compositeAbundance"], COMPOSITE, FUNCTION)

#: 2.1.4 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XgeneA
gene_tag = one_of_tags(["g", "geneAbundance"], GENE, FUNCTION)

#: 2.1.5 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XmicroRNAA
mirna_tag = one_of_tags(["m", "microRNAAbundance"], MIRNA, FUNCTION)

#: 2.1.6 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XproteinA
protein_tag = one_of_tags(["p", "proteinAbundance"], PROTEIN, FUNCTION)

#: 2.1.7 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XrnaA
rna_tag = one_of_tags(["r", "rnaAbundance"], RNA, FUNCTION)

######################
# Modifier Functions #
######################

# `2.2.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_protein_modifications>`_
# See below (needs identifier)

#: `2.2.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_variant_var>`_
variant = get_hgvs_language()

#: `2.2.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_proteolytic_fragments>`_
fragment = get_fragment_language()

# `2.2.4 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_cellular_location>`_
# See below (needs identifier)

#: DEPRECATED
#: <http://openbel.org/language/version_1.0/bel_specification_version_1.0.html#_amino_acid_substitutions>
psub = get_protein_substitution_language()

#: DEPRECATED
#: http://openbel.org/language/version_1.0/bel_specification_version_1.0.html#_sequence_variations>
gsub = get_gene_substitution_language()

#: DEPRECATED
#: http://openbel.org/language/version_1.0/bel_specification_version_1.0.html#_truncated_proteins>
trunc = get_truncation_language()

###############################
# 2.3 & 2.4 Process Functions #
###############################

#: 2.3.1 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_biologicalprocess_bp
biological_process_tag = one_of_tags(["bp", "biologicalProcess"], BIOPROCESS, FUNCTION)

#: 2.3.2 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_pathology_path
pathology_tag = one_of_tags(["o", "path", "pathology"], PATHOLOGY, FUNCTION)

population_tag = one_of_tags(["pop", "populationAbundance"], POPULATION, FUNCTION)

#: 2.3.3 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#Xactivity
activity_tag = one_of_tags(["act", "activity"], ACTIVITY, MODIFIER)

#: 2.4.1 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XmolecularA
molecular_activity_tags = Suppress(oneOf(["ma", "molecularActivity"]))

################################
# 2.5 Transformation Functions #
################################

#: 2.5.1a http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_translocation_tloc
translocation_tag = one_of_tags(["translocation", "tloc"], TRANSLOCATION, MODIFIER)

#: 2.5.1b http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_cellsecretion_sec
cell_secretion_tag = one_of_tags(["sec", "cellSecretion"], CELL_SECRETION, MODIFIER)

#: 2.5.1c http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_cellsurfaceexpression_surf
cell_surface_expression_tag = one_of_tags(["surf", "cellSurfaceExpression"], CELL_SURFACE_EXPRESSION, MODIFIER)

#: 2.5.2 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_degradation_deg
degradation_tags = one_of_tags(["deg", "degradation"], DEGRADATION, MODIFIER)

#: 2.5.3 http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_reaction_rxn
reaction_tags = one_of_tags(["reaction", "rxn"], REACTION, FUNCTION)

#####################
# BEL Relationships #
#####################

#: `3.1.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#Xincreases>`_
increases_tag = oneOf(["->", "→", "increases"]).setParseAction(replaceWith(INCREASES))

#: `3.1.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XdIncreases>`_
directly_increases_tag = one_of_tags(["=>", "⇒", "directlyIncreases"], DIRECTLY_INCREASES)

#: `3.1.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#Xdecreases>`_
decreases_tag = one_of_tags(["-|", "decreases"], DECREASES)

#: `3.1.4 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XdDecreases>`_
directly_decreases_tag = one_of_tags(["=|", "directlyDecreases"], DIRECTLY_DECREASES)

#: `3.1.5 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_ratelimitingstepof>`_
rate_limit_tag = Keyword("rateLimitingStepOf")

#: `3.1.6 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#Xcnc>`_
causes_no_change_tag = one_of_tags(["cnc", "causesNoChange"], CAUSES_NO_CHANGE)

#: `3.1.7 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_regulates_reg>`_
regulates_tag = one_of_tags(["reg", "regulates"], REGULATES)

#: Binds relation
binds_tag = Keyword(BINDS)

#: Correlation relation
correlation_tag = one_of_tags(["cor", "correlation"], CORRELATION)

#: No Correlation relation
no_correlation_tag = one_of_tags(["noCor", "noCorrelation"], NO_CORRELATION)

#: `3.2.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XnegCor>`_
negative_correlation_tag = one_of_tags(["neg", "negativeCorrelation"], NEGATIVE_CORRELATION)

#: `3.2.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XposCor>`_
positive_correlation_tag = one_of_tags(["pos", "positiveCorrelation"], POSITIVE_CORRELATION)

#: `3.2.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#Xassociation>`_
association_tag = one_of_tags(["--", "association"], ASSOCIATION)

#: `3.3.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_orthologous>`_
orthologous_tag = Keyword("orthologous")

#: `3.3.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_transcribedto>`_
transcribed_tag = oneOf([":>", "transcribedTo"]).setParseAction(replaceWith(TRANSCRIBED_TO))

#: `3.3.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_translatedto>`_
translated_tag = oneOf([">>", "translatedTo"]).setParseAction(replaceWith(TRANSLATED_TO))

#: `3.4.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_hasmember>`_
has_member_tag = Keyword("hasMember")

#: `3.4.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_hasmembers>`_
has_members_tag = Keyword("hasMembers")

#: `3.4.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_hascomponent>`_
has_component_tag = Keyword("hasComponent")

#: `3.4.4 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_hascomponents>`_
has_components_tag = Keyword("hasComponents")

#: `3.4.5 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_isa>`_
is_a_tag = Keyword(IS_A)

#: `3.4.6 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_subprocessof>`_
subprocess_of_tag = Keyword("subProcessOf")

#: `3.5.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_analogous>`_
analogous_tag = Keyword("analogousTo")

#: `3.5.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_biomarkerfor>`_
biomarker_tag = Keyword("biomarkerFor")

#: `3.5.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_prognosticbiomarkerfor>`_
prognostic_biomarker_tag = Keyword("prognosticBiomarkerFor")

biomarker_tags = biomarker_tag | prognostic_biomarker_tag

# Computed edges
has_variant_tags = Keyword("hasVariant")
has_reactant_tags = Keyword("hasReactant")
has_product_tags = Keyword("hasProduct")
part_of_reaction_tags = has_reactant_tags | has_product_tags

#: The ``equivalentTp`` relationship has been proposed for BEL 2.0.0+
equivalent_tag = one_of_tags(["eq", EQUIVALENT_TO], EQUIVALENT_TO)

#: The ``partOf`` relationship has been proposed for BEL 2.0.0+
partof_tag = Keyword(PART_OF)


[docs]class BELParser(BaseParser): """Build a parser backed by a given dictionary of namespaces.""" def __init__( self, graph: Optional[BELGraph] = None, namespace_to_term_to_encoding: Optional[NamespaceTermEncodingMapping] = None, namespace_to_pattern: Optional[Mapping[str, Pattern]] = None, annotation_to_term: Optional[Mapping[str, Set[str]]] = None, annotation_to_pattern: Optional[Mapping[str, Pattern]] = None, annotation_to_local: Optional[Mapping[str, Set[str]]] = None, allow_naked_names: bool = False, disallow_nested: bool = False, disallow_unqualified_translocations: bool = False, citation_clearing: bool = True, skip_validation: bool = False, autostreamline: bool = True, required_annotations: Optional[List[str]] = None, ) -> None: """Build a BEL parser. :param graph: The BEL graph to use to store the network :param namespace_to_term_to_encoding: A dictionary of {namespace: {name: encoding}}. Delegated to :class:`pybel.parser.parse_identifier.IdentifierParser` :param namespace_to_pattern: A dictionary of {namespace: compiled regular expression}. Delegated to :class:`pybel.parser.parse_identifier.IdentifierParser` :param annotation_to_term: A dictionary of {annotation: set of values}. Delegated to :class:`pybel.parser.ControlParser` :param annotation_to_pattern: A dictionary of {annotation: regular expression strings}. Delegated to :class:`pybel.parser.ControlParser` :param annotation_to_local: A dictionary of {annotation: set of values}. Delegated to :class:`pybel.parser.ControlParser` :param allow_naked_names: If true, turn off naked namespace failures. Delegated to :class:`pybel.parser.parse_identifier.IdentifierParser` :param disallow_nested: If true, turn on nested statement failures. Delegated to :class:`pybel.parser.parse_identifier.IdentifierParser` :param disallow_unqualified_translocations: If true, allow translocations without TO and FROM clauses. :param citation_clearing: Should :code:`SET Citation` statements clear evidence and all annotations? Delegated to :class:`pybel.parser.ControlParser` :param autostreamline: Should the parser be streamlined on instantiation? :param required_annotations: Optional list of required annotations """ self.graph = graph self.disallow_nested = disallow_nested self.disallow_unqualified_translocations = disallow_unqualified_translocations if skip_validation: self.control_parser = ControlParser( citation_clearing=citation_clearing, required_annotations=required_annotations, ) self.concept_parser = ConceptParser( allow_naked_names=allow_naked_names, skip_validation=skip_validation, ) else: self.control_parser = ControlParser( annotation_to_term=annotation_to_term, annotation_to_pattern=annotation_to_pattern, annotation_to_local=annotation_to_local, # citation_clearing=citation_clearing, required_annotations=required_annotations, ) self.concept_parser = ConceptParser( namespace_to_term_to_encoding=namespace_to_term_to_encoding, namespace_to_pattern=namespace_to_pattern, # allow_naked_names=allow_naked_names, skip_validation=skip_validation, ) self.control_parser.get_line_number = self.get_line_number self.concept_parser.get_line_number = self.get_line_number concept = Group(self.concept_parser.language)(CONCEPT) # 2.2 Abundance Modifier Functions #: `2.2.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_protein_modifications>`_ self.pmod = get_protein_modification_language( concept_fqualified=self.concept_parser.identifier_fqualified, concept_qualified=self.concept_parser.identifier_qualified, ) #: `2.2.4 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_cellular_location>`_ self.location = get_location_language(self.concept_parser.language) opt_location = pyparsing.Optional(WCW + self.location) #: PyBEL BEL Specification variant self.gmod = get_gene_modification_language( concept_fqualified=self.concept_parser.identifier_fqualified, concept_qualified=self.concept_parser.identifier_qualified, ) # 2.6 Other Functions #: `2.6.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_fusion_fus>`_ self.fusion = get_fusion_language(self.concept_parser.language) # 2.1 Abundance Functions #: `2.1.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XcomplexA>`_ self.general_abundance = general_abundance_tags + nest(concept + opt_location) self.gene_modified = concept + pyparsing.Optional( WCW + delimitedList(Group(variant | gsub | self.gmod))(VARIANTS), ) self.gene_fusion = Group(self.fusion)(FUSION) self.gene_fusion_legacy = Group(get_legacy_fusion_langauge(concept, "c"))(FUSION) #: `2.1.4 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XgeneA>`_ self.gene = gene_tag + nest( MatchFirst( [ self.gene_fusion, self.gene_fusion_legacy, self.gene_modified, ] ) + opt_location, ) self.mirna_modified = ( concept + pyparsing.Optional( WCW + delimitedList(Group(variant))(VARIANTS), ) + opt_location ) #: `2.1.5 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XmicroRNAA>`_ self.mirna = mirna_tag + nest(self.mirna_modified) self.protein_modified = concept + pyparsing.Optional( WCW + delimitedList(Group(MatchFirst([self.pmod, variant, fragment, psub, trunc])))( VARIANTS, ), ) self.protein_fusion = Group(self.fusion)(FUSION) self.protein_fusion_legacy = Group(get_legacy_fusion_langauge(concept, "p"))(FUSION) #: `2.1.6 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XproteinA>`_ self.protein = protein_tag + nest( MatchFirst( [ self.protein_fusion, self.protein_fusion_legacy, self.protein_modified, ] ) + opt_location, ) self.rna_modified = concept + pyparsing.Optional(WCW + delimitedList(Group(variant))(VARIANTS)) self.rna_fusion = Group(self.fusion)(FUSION) self.rna_fusion_legacy = Group(get_legacy_fusion_langauge(concept, "r"))(FUSION) #: `2.1.7 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XrnaA>`_ self.rna = rna_tag + nest( MatchFirst( [ self.rna_fusion, self.rna_fusion_legacy, self.rna_modified, ] ) + opt_location, ) self.population = population_tag + nest(concept) self.single_abundance = MatchFirst( [ self.general_abundance, self.gene, self.mirna, self.protein, self.rna, self.population, ] ) #: `2.1.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XcomplexA>`_ self.complex_singleton = complex_tag + nest(concept + opt_location) self.complex_list = complex_tag + nest( delimitedList(Group(self.single_abundance | self.complex_singleton))(MEMBERS) + opt_location, ) self.complex_abundances = self.complex_list | self.complex_singleton # Definition of all simple abundances that can be used in a composite abundance self.simple_abundance = self.complex_abundances | self.single_abundance self.simple_abundance.setParseAction(self.check_function_semantics) #: `2.1.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XcompositeA>`_ self.composite_abundance = composite_abundance_tag + nest( delimitedList(Group(self.simple_abundance))(MEMBERS) + opt_location, ) self.abundance = self.simple_abundance | self.composite_abundance # 2.4 Process Modifier Function # backwards compatibility with BEL v1.0 molecular_activity_default = oneOf(list(language.activity_labels)).setParseAction( handle_molecular_activity_default, ) #: `2.4.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#XmolecularA>`_ self.molecular_activity = molecular_activity_tags + nest( molecular_activity_default | self.concept_parser.language, ) # 2.3 Process Functions #: `2.3.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_biologicalprocess_bp>`_ self.biological_process = biological_process_tag + nest(concept) #: `2.3.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_pathology_path>`_ self.pathology = pathology_tag + nest(concept) self.bp_path = self.biological_process | self.pathology self.bp_path.setParseAction(self.check_function_semantics) self.activity_standard = activity_tag + nest( self.simple_abundance + pyparsing.Optional(WCW + Group(self.molecular_activity)(EFFECT)), ) activity_legacy_tags = oneOf(language.activities)(MODIFIER) self.activity_legacy = activity_legacy_tags + nest(self.simple_abundance) self.activity_legacy.setParseAction(handle_activity_legacy) #: `2.3.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#Xactivity>`_ self.activity = self.activity_standard | self.activity_legacy self.process = self.bp_path | self.activity # 2.5 Transformation Functions from_loc = Suppress(FROM_LOC) + nest(concept(FROM_LOC)) to_loc = Suppress(TO_LOC) + nest(concept(TO_LOC)) self.cell_secretion = cell_secretion_tag + nest(self.simple_abundance) self.cell_secretion.addParseAction(handle_secretion) self.cell_surface_expression = cell_surface_expression_tag + nest(self.simple_abundance) self.cell_surface_expression.addParseAction(handle_surface_expression) self.translocation_standard = nest( self.simple_abundance + WCW + Group(from_loc + WCW + to_loc)(EFFECT), ) self.translocation_legacy = nest( self.simple_abundance + WCW + Group(concept(FROM_LOC) + WCW + concept(TO_LOC))(EFFECT), ) self.translocation_legacy.addParseAction(handle_legacy_tloc) self.translocation_unqualified = nest(self.simple_abundance) if self.disallow_unqualified_translocations: self.translocation_unqualified.setParseAction(self.handle_translocation_illegal) #: `2.5.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_translocations>`_ self.translocation = translocation_tag + MatchFirst( [ self.translocation_unqualified, self.translocation_standard, self.translocation_legacy, ] ) #: `2.5.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_degradation_deg>`_ self.degradation = degradation_tags + nest(self.simple_abundance) #: `2.5.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_reaction_rxn>`_ self.reactants = Suppress(REACTANTS) + nest(delimitedList(Group(self.simple_abundance))) self.products = Suppress(PRODUCTS) + nest(delimitedList(Group(self.simple_abundance))) self.reaction = reaction_tags + nest(Group(self.reactants)(REACTANTS), Group(self.products)(PRODUCTS)) self.transformation = MatchFirst( [ self.cell_secretion, self.cell_surface_expression, self.translocation, self.degradation, self.reaction, ] ) # 3 BEL Relationships self.bel_term = MatchFirst([self.transformation, self.process, self.abundance]).streamline() self.bel_to_bel_relations = [ association_tag, increases_tag, decreases_tag, positive_correlation_tag, negative_correlation_tag, correlation_tag, no_correlation_tag, binds_tag, causes_no_change_tag, orthologous_tag, is_a_tag, equivalent_tag, partof_tag, directly_increases_tag, directly_decreases_tag, analogous_tag, regulates_tag, ] self.bel_to_bel = triple(self.bel_term, MatchFirst(self.bel_to_bel_relations), self.bel_term) # Mixed Relationships #: `3.1.5 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_ratelimitingstepof>`_ self.rate_limit = triple( MatchFirst([self.biological_process, self.activity, self.transformation]), rate_limit_tag, self.biological_process, ) #: `3.4.6 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_subprocessof>`_ self.subprocess_of = triple( MatchFirst([self.process, self.activity, self.transformation]), subprocess_of_tag, self.process, ) #: `3.3.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_transcribedto>`_ self.transcribed = triple(self.gene, transcribed_tag, self.rna) #: `3.3.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_translatedto>`_ self.translated = triple(self.rna, translated_tag, self.protein) #: `3.4.1 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_hasmember>`_ self.has_member = triple(self.abundance, has_member_tag, self.abundance) #: `3.4.2 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_hasmembers>`_ self.abundance_list = Suppress("list") + nest(delimitedList(Group(self.abundance))) self.has_members = triple(self.abundance, has_members_tag, self.abundance_list) self.has_members.setParseAction(self.handle_has_members) self.has_components = triple(self.abundance, has_components_tag, self.abundance_list) self.has_components.setParseAction(self.handle_has_components) self.has_list = self.has_members | self.has_components # `3.4.3 <http://openbel.org/language/version_2.0/bel_specification_version_2.0.html#_hascomponent>`_ self.has_component = triple( self.abundance, has_component_tag, self.abundance, ) self.biomarker = triple(self.bel_term, biomarker_tags, self.process) self.has_variant_relation = triple(self.abundance, has_variant_tags, self.abundance) self.part_of_reaction = triple(self.reaction, part_of_reaction_tags, self.abundance) self.relation = MatchFirst( [ self.bel_to_bel, # self.has_member, # self.has_component, self.subprocess_of, self.rate_limit, self.biomarker, self.transcribed, self.translated, # self.has_variant_relation, # self.part_of_reaction, ] ) if self.graph is not None: self.relation.setParseAction(self._handle_relation_harness) self.inverted_unqualified_relation = MatchFirst( [ self.has_member, self.has_component, ] ) if self.graph is not None: self.inverted_unqualified_relation.setParseAction(self.handle_inverse_unqualified_relation) self.normal_unqualified_relation = MatchFirst( [ self.has_member, self.has_component, self.has_variant_relation, self.part_of_reaction, ] ) if self.graph is not None: self.normal_unqualified_relation.setParseAction(self.handle_unqualified_relation) #: 3.1 Causal Relationships - nested. causal_relation_tags = MatchFirst( [ increases_tag, decreases_tag, directly_decreases_tag, directly_increases_tag, ] ) self.nested_causal_relationship = triple( self.bel_term, causal_relation_tags, nest(triple(self.bel_term, causal_relation_tags, self.bel_term)), ) if self.graph is not None: self.nested_causal_relationship.setParseAction(self.handle_nested_relation) # has_members is handled differently from all other relations becuase it gets distrinbuted self.relation = MatchFirst( [ self.has_list, self.nested_causal_relationship, self.relation, self.inverted_unqualified_relation, self.normal_unqualified_relation, ] ) self.singleton_term = self.bel_term + StringEnd() if self.graph is not None: self.singleton_term.setParseAction(self.handle_term) self.statement = self.relation | self.singleton_term self.language = self.control_parser.language | self.statement self.language.setName("BEL") super(BELParser, self).__init__(self.language, streamline=autostreamline)
[docs] def parse(self, s: str) -> Mapping[str, Any]: """Parse the string.""" return self.parseString(s).asDict()
@property def _namespace_dict(self) -> Mapping[str, Mapping[str, str]]: """Get the dictionary of {namespace: {name: encoding}} stored in the internal identifier parser.""" return self.concept_parser.namespace_to_name_to_encoding @property def _allow_naked_names(self) -> bool: """Return if naked names should be parsed (``True``), or if errors should be thrown (``False``).""" return self.concept_parser.allow_naked_names
[docs] def get_annotations(self) -> Dict: """Get the current annotations in this parser.""" return self.control_parser.get_annotations()
[docs] def clear(self): """Clear the graph and all control parser data (current citation, annotations, and statement group).""" if self.graph is not None: self.graph.clear() self.control_parser.clear()
[docs] def handle_nested_relation(self, line: str, position: int, tokens: ParseResults): """Handle nested statements. If :code:`self.disallow_nested` is True, raises a ``NestedRelationWarning``. :raises: NestedRelationWarning """ if self.disallow_nested: raise NestedRelationWarning(self.get_line_number(), line, position) subject_hash = self._handle_relation_checked( line, position, { SOURCE: tokens[SOURCE], RELATION: tokens[RELATION], TARGET: tokens[TARGET][SOURCE], }, ) object_hash = self._handle_relation_checked( line, position, { SOURCE: tokens[TARGET][SOURCE], RELATION: tokens[TARGET][RELATION], TARGET: tokens[TARGET][TARGET], }, ) self.graph.add_transitivity(subject_hash, object_hash) return tokens
[docs] def check_function_semantics(self, line: str, position: int, tokens: ParseResults) -> ParseResults: """Raise an exception if the function used on the tokens is wrong. :raises: InvalidFunctionSemantic """ concept = tokens.get(CONCEPT) if not self._namespace_dict or concept is None: return tokens namespace, name = concept[NAMESPACE], concept[NAME] if namespace in self.concept_parser.namespace_to_pattern: return tokens if self._allow_naked_names and namespace == DIRTY: # Don't check dirty names in lenient mode return tokens valid_functions = set( itt.chain.from_iterable( belns_encodings.get(encoding, set()) for encoding in self._namespace_dict[namespace][name] ), ) if not valid_functions: raise InvalidEntity(self.get_line_number(), line, position, namespace, name) if tokens[FUNCTION] not in valid_functions: raise InvalidFunctionSemantic( line_number=self.get_line_number(), line=line, position=position, func=tokens[FUNCTION], namespace=namespace, name=name, allowed_functions=valid_functions, ) return tokens
[docs] def handle_term(self, _, __, tokens: ParseResults) -> ParseResults: """Handle BEL terms (the subject and object of BEL relations).""" self.ensure_node(tokens) return tokens
def _handle_list_helper(self, tokens: ParseResults, relation: str) -> ParseResults: """Provide the functionality for :meth:`handle_has_members` and :meth:`handle_has_components`.""" parent_node_dsl = self.ensure_node(tokens[0]) for child_tokens in tokens[2]: child_node_dsl = self.ensure_node(child_tokens) # Note that the polarity is switched since this is just for hasMembers # and hasComponents, which are both deprecated as of BEL v2.2 self.graph.add_unqualified_edge(child_node_dsl, parent_node_dsl, relation) return tokens
[docs] def handle_has_members(self, _, __, tokens: ParseResults) -> ParseResults: """Handle list relations like ``p(X) hasMembers list(p(Y), p(Z), ...)``.""" return self._handle_list_helper(tokens, IS_A)
[docs] def handle_has_components(self, _, __, tokens: ParseResults) -> ParseResults: """Handle list relations like ``p(X) hasComponents list(p(Y), p(Z), ...)``.""" return self._handle_list_helper(tokens, PART_OF)
def _add_qualified_edge_helper( self, *, source, source_modifier, relation, target, target_modifier, annotations, ) -> str: """Add a qualified edge from the internal aspects of the parser.""" m = { BINDS: self.graph.add_binds, } adder = m.get(relation) d = dict( evidence=self.control_parser.evidence, citation=self.control_parser.get_citation(), annotations=annotations, source_modifier=source_modifier, target_modifier=target_modifier, **{LINE: self.get_line_number()}, ) if adder is not None: return adder(source=source, target=target, **d) else: return self.graph.add_qualified_edge(source=source, target=target, relation=relation, **d) def _add_qualified_edge(self, *, source, source_modifier, relation, target, target_modifier) -> str: """Add an edge, then adds the opposite direction edge if it should.""" d = dict( relation=relation, annotations=self.control_parser.annotations, ) if relation in TWO_WAY_RELATIONS: self._add_qualified_edge_helper( source=target, source_modifier=target_modifier, target=source, target_modifier=source_modifier, **d, ) return self._add_qualified_edge_helper( source=source, source_modifier=source_modifier, target=target, target_modifier=target_modifier, **d, ) def _handle_relation(self, tokens: ParseResults) -> str: """Handle a relation.""" source = self.ensure_node(tokens[SOURCE]) source_modifier = modifier_po_to_dict(tokens[SOURCE]) relation = tokens[RELATION] target = self.ensure_node(tokens[TARGET]) target_modifier = modifier_po_to_dict(tokens[TARGET]) return self._add_qualified_edge( source=source, source_modifier=source_modifier, relation=relation, target=target, target_modifier=target_modifier, ) def _handle_relation_harness(self, line: str, position: int, tokens: Union[ParseResults, Dict]) -> ParseResults: """Handle BEL relations based on the policy specified on instantiation. Note: this can't be changed after instantiation! """ self._handle_relation_checked(line, position, tokens) return tokens def _handle_relation_checked(self, line, position, tokens): if not self.control_parser.citation_is_set: raise MissingCitationException(self.get_line_number(), line, position) if not self.control_parser.evidence: raise MissingSupportWarning(self.get_line_number(), line, position) missing_required_annotations = self.control_parser.get_missing_required_annotations() if missing_required_annotations: raise MissingAnnotationWarning(self.get_line_number(), line, position, missing_required_annotations) return self._handle_relation(tokens)
[docs] def handle_unqualified_relation(self, _, __, tokens: ParseResults) -> ParseResults: """Handle unqualified relations.""" subject_node_dsl = self.ensure_node(tokens[SOURCE]) object_node_dsl = self.ensure_node(tokens[TARGET]) relation = tokens[RELATION] self.graph.add_unqualified_edge(subject_node_dsl, object_node_dsl, relation) return tokens
[docs] def handle_inverse_unqualified_relation(self, _, __, tokens: ParseResults) -> ParseResults: """Handle unqualified relations that should go reverse.""" source = self.ensure_node(tokens[SOURCE]) target = self.ensure_node(tokens[TARGET]) relation = tokens[RELATION] self.graph.add_unqualified_edge(source=target, target=source, relation=relation) return tokens
[docs] def ensure_node(self, tokens: ParseResults) -> BaseEntity: """Turn parsed tokens into canonical node name and makes sure its in the graph.""" node = parse_result_to_dsl(tokens) self.graph.add_node_from_data(node) return node
[docs] def handle_translocation_illegal(self, line: str, position: int, tokens: ParseResults) -> None: """Handle a malformed translocation.""" raise MalformedTranslocationWarning(self.get_line_number(), line, position, tokens)
# HANDLERS def handle_molecular_activity_default(_: str, __: int, tokens: ParseResults) -> ParseResults: """Handle a BEL 2.0 style molecular activity with BEL default names.""" upgraded_cls = language.activity_labels[tokens[0]] upgraded_concept = language.activity_mapping[upgraded_cls] tokens[NAMESPACE] = upgraded_concept.namespace tokens[NAME] = upgraded_concept.name tokens[IDENTIFIER] = upgraded_concept.identifier return tokens def handle_activity_legacy(_: str, __: int, tokens: ParseResults) -> ParseResults: """Handle BEL 1.0 activities.""" legacy_cls = language.activity_labels[tokens[MODIFIER]] tokens[MODIFIER] = ACTIVITY tokens[EFFECT] = language.activity_mapping[legacy_cls] logger.log(5, "upgraded legacy activity to %s", legacy_cls) return tokens def handle_legacy_tloc(line: str, position: int, tokens: ParseResults) -> ParseResults: """Handle translocations that lack the ``fromLoc`` and ``toLoc`` entries.""" logger.log(5, "legacy translocation statement: %s [%d]", line, position) return tokens def handle_secretion(_, __, tokens: ParseResults) -> ParseResults: tokens[MODIFIER] = TRANSLOCATION tokens[EFFECT] = { FROM_LOC: language.intracellular, TO_LOC: language.extracellular, } return tokens def handle_surface_expression(_, __, tokens: ParseResults) -> ParseResults: tokens[MODIFIER] = TRANSLOCATION tokens[EFFECT] = { FROM_LOC: language.intracellular, TO_LOC: language.cell_surface, } return tokens def modifier_po_to_dict(tokens): """Get the location, activity, and/or transformation information as a dictionary. :return: a dictionary describing the modifier :rtype: dict """ attrs = {} if LOCATION in tokens: attrs[LOCATION] = dict(tokens[LOCATION]) if MODIFIER not in tokens: return attrs if tokens[MODIFIER] == DEGRADATION: attrs[MODIFIER] = tokens[MODIFIER] elif tokens[MODIFIER] == ACTIVITY: attrs[MODIFIER] = tokens[MODIFIER] if EFFECT in tokens: attrs[EFFECT] = dict(tokens[EFFECT]) elif tokens[MODIFIER] == TRANSLOCATION: attrs[MODIFIER] = tokens[MODIFIER] if EFFECT in tokens: try: attrs[EFFECT] = tokens[EFFECT].asDict() except AttributeError: # for when it was auto-upgraded attrs[EFFECT] = dict(tokens[EFFECT]) elif tokens[MODIFIER] == CELL_SECRETION: attrs[MODIFIER] = TRANSLOCATION attrs[EFFECT] = { FROM_LOC: language.intracellular, TO_LOC: language.extracellular, } elif tokens[MODIFIER] == CELL_SURFACE_EXPRESSION: attrs[MODIFIER] = TRANSLOCATION attrs[EFFECT] = { FROM_LOC: language.intracellular, TO_LOC: language.cell_surface, } else: raise ValueError("Invalid value for tokens[MODIFIER]: {}".format(tokens[MODIFIER])) return attrs @lru_cache() def _default_parser(): return BELParser(skip_validation=True, citation_clearing=False) @lru_cache() def parse(s: str, pprint=False): """Parse a BEL statement (without validation).""" rv = _default_parser().parse(s) if pprint: import json print(json.dumps(rv, indent=2)) else: return rv