Source code for pybel.parser.parse_metadata
# -*- coding: utf-8 -*-
"""This module supports the relation parser by handling statements."""
import logging
import re
from typing import Mapping, Optional, Pattern, Set
from pyparsing import And, MatchFirst, ParseResults, Suppress, Word, pyparsing_common as ppc
from .baseparser import BaseParser
from .constants import NamespaceTermEncodingMapping
from .exc import InvalidMetadataException, RedefinedAnnotationError, RedefinedNamespaceError, VersionFormatWarning
from .utils import delimited_quoted_list, qid, quote, word
from ..constants import (
BEL_KEYWORD_ANNOTATION, BEL_KEYWORD_AS, BEL_KEYWORD_DEFINE, BEL_KEYWORD_DOCUMENT, BEL_KEYWORD_LIST,
BEL_KEYWORD_NAMESPACE, BEL_KEYWORD_PATTERN, BEL_KEYWORD_SET, BEL_KEYWORD_URL, DOCUMENT_KEYS, METADATA_VERSION,
belns_encodings,
)
from ..resources.resources import keyword_to_url
from ..utils import valid_date_version
__all__ = [
'MetadataParser',
]
logger = logging.getLogger(__name__)
as_tag = Suppress(BEL_KEYWORD_AS)
url_tag = Suppress(BEL_KEYWORD_URL)
list_tag = Suppress(BEL_KEYWORD_LIST)
set_tag = Suppress(BEL_KEYWORD_SET)
define_tag = Suppress(BEL_KEYWORD_DEFINE)
function_tags = Word(''.join(belns_encodings))
SEMANTIC_VERSION_STRING_RE = re.compile(
r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?:-(?P<release>[0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+(?P<build>[0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?',
)
MALFORMED_VERSION_STRING_RE = re.compile(r'(?P<major>\d+)(\.(?P<minor>\d+)(\.(?P<patch>\d+))?)?')
NAMESPACE_BLACKLIST = {} # TODO: {'SCOMP', 'SFAM'}
[docs]class MetadataParser(BaseParser):
"""A parser for the document and definitions section of a BEL document.
.. seealso::
BEL 1.0 Specification for the `DEFINE <http://openbel.org/language/web/version_1.0/bel_specification_version_1.0.html#_define>`_ keyword
"""
def __init__(
self,
manager,
namespace_to_term_to_encoding: Optional[NamespaceTermEncodingMapping] = None,
namespace_to_pattern: Optional[Mapping[str, Pattern]] = None,
annotation_to_term: Optional[Mapping[str, Set[str]]] = None,
annotation_to_pattern: Optional[Mapping[str, Pattern]] = None,
annotation_to_local: Optional[Mapping[str, Set[str]]] = None,
default_namespace: Optional[Set[str]] = None,
allow_redefinition: bool = False,
skip_validation: bool = False,
upgrade_urls: bool = False,
) -> None:
"""Build a metadata parser.
:param manager: A cache manager
:param namespace_to_term_to_encoding:
An enumerated namespace mapping from {namespace keyword: {(identifier, name): encoding}}
:param namespace_to_pattern:
A regular expression namespace mapping from {namespace keyword: regex string}
:param annotation_to_term: Enumerated annotation mapping from {annotation keyword: set of valid values}
:param annotation_to_pattern: Regular expression annotation mapping from {annotation keyword: regex string}
:param default_namespace: A set of strings that can be used without a namespace
:param skip_validation: If true, don't download and cache namespaces/annotations
"""
#: This metadata parser's internal definition cache manager
self.manager = manager
self.disallow_redefinition = not allow_redefinition
self.skip_validation = skip_validation
self.upgrade_urls = upgrade_urls
#: A dictionary of cached {namespace keyword: {(identifier, name): encoding}}
self.namespace_to_term_to_encoding = namespace_to_term_to_encoding or {}
#: A set of namespaces's URLs that can't be cached
self.uncachable_namespaces = set()
#: A dictionary of {namespace keyword: regular expression string}
self.namespace_to_pattern = namespace_to_pattern or {}
#: A set of names that can be used without a namespace
self.default_namespace = set(default_namespace) if default_namespace is not None else None
#: A dictionary of cached {annotation keyword: set of values}
self.annotation_to_term = annotation_to_term or {}
#: A dictionary of {annotation keyword: regular expression string}
self.annotation_to_pattern = annotation_to_pattern or {}
#: A dictionary of cached {annotation keyword: set of values}
self.annotation_to_local = annotation_to_local or {}
#: A dictionary containing the document metadata
self.document_metadata = {}
#: A dictionary from {namespace keyword: BEL namespace URL}
self.namespace_url_dict = {}
#: A dictionary from {annotation keyword: BEL annotation URL}
self.annotation_url_dict = {}
self.document = And([
set_tag,
Suppress(BEL_KEYWORD_DOCUMENT),
word('key'),
Suppress('='),
qid('value'),
])
namespace_tag = And([define_tag, Suppress(BEL_KEYWORD_NAMESPACE), ppc.identifier('name'), as_tag])
self.namespace_url = And([namespace_tag, url_tag, quote('url')])
self.namespace_pattern = And([namespace_tag, Suppress(BEL_KEYWORD_PATTERN), quote('value')])
annotation_tag = And([define_tag, Suppress(BEL_KEYWORD_ANNOTATION), ppc.identifier('name'), as_tag])
self.annotation_url = And([annotation_tag, url_tag, quote('url')])
self.annotation_list = And([annotation_tag, list_tag, delimited_quoted_list('values')])
self.annotation_pattern = And([annotation_tag, Suppress(BEL_KEYWORD_PATTERN), quote('value')])
self.document.setParseAction(self.handle_document)
self.namespace_url.setParseAction(self.handle_namespace_url)
self.namespace_pattern.setParseAction(self.handle_namespace_pattern)
self.annotation_url.setParseAction(self.handle_annotations_url)
self.annotation_list.setParseAction(self.handle_annotation_list)
self.annotation_pattern.setParseAction(self.handle_annotation_pattern)
self.language = MatchFirst([
self.document,
self.namespace_url,
self.annotation_url,
self.annotation_list,
self.annotation_pattern,
self.namespace_pattern,
]).setName('BEL Metadata')
super(MetadataParser, self).__init__(self.language)
[docs] def handle_document(self, line: str, position: int, tokens: ParseResults) -> ParseResults:
"""Handle statements like ``SET DOCUMENT X = "Y"``.
:raises: InvalidMetadataException
:raises: VersionFormatWarning
"""
key = tokens['key']
value = tokens['value']
if key not in DOCUMENT_KEYS:
raise InvalidMetadataException(self.get_line_number(), line, position, key, value)
norm_key = DOCUMENT_KEYS[key]
if norm_key in self.document_metadata:
logger.warning('Tried to overwrite metadata: %s', key)
return tokens
self.document_metadata[norm_key] = value
if norm_key == METADATA_VERSION:
self.raise_for_version(line, position, value)
return tokens
[docs] def raise_for_redefined_namespace(self, line: str, position: int, namespace: str) -> None:
"""Raise an exception if a namespace is already defined.
:raises: RedefinedNamespaceError
"""
if self.disallow_redefinition and self.has_namespace(namespace):
raise RedefinedNamespaceError(self.get_line_number(), line, position, namespace)
[docs] def handle_namespace_url(self, line: str, position: int, tokens: ParseResults) -> ParseResults:
"""Handle statements like ``DEFINE NAMESPACE X AS URL "Y"``.
:raises: RedefinedNamespaceError
:raises: pybel.resources.exc.ResourceError
"""
namespace_keyword = tokens['name']
if namespace_keyword in NAMESPACE_BLACKLIST:
raise ValueError('Upgrade usage to FamPlex')
self.raise_for_redefined_namespace(line, position, namespace_keyword)
url = tokens['url']
if self.upgrade_urls and namespace_keyword.lower() in keyword_to_url:
url = keyword_to_url[namespace_keyword.lower()]
self.namespace_url_dict[namespace_keyword] = url
if self.skip_validation:
return tokens
namespace = self.manager.get_or_create_namespace(url)
self.namespace_to_term_to_encoding[namespace_keyword] = namespace.get_term_to_encodings()
return tokens
[docs] def handle_namespace_pattern(self, line: str, position: int, tokens: ParseResults) -> ParseResults:
"""Handle statements like ``DEFINE NAMESPACE X AS PATTERN "Y"``.
:raises: RedefinedNamespaceError
"""
namespace = tokens['name']
self.raise_for_redefined_namespace(line, position, namespace)
self.namespace_to_pattern[namespace] = re.compile(tokens['value'])
return tokens
[docs] def raise_for_redefined_annotation(self, line: str, position: int, annotation: str) -> None:
"""Raise an exception if the given annotation is already defined.
:raises: RedefinedAnnotationError
"""
if self.disallow_redefinition and self.has_annotation(annotation):
raise RedefinedAnnotationError(self.get_line_number(), line, position, annotation)
[docs] def handle_annotations_url(self, line: str, position: int, tokens: ParseResults) -> ParseResults:
"""Handle statements like ``DEFINE ANNOTATION X AS URL "Y"``.
:raises: RedefinedAnnotationError
"""
keyword = tokens['name']
self.raise_for_redefined_annotation(line, position, keyword)
url = tokens['url']
self.annotation_url_dict[keyword] = url
if self.skip_validation:
return tokens
self.annotation_to_term[keyword] = self.manager.get_annotation_entry_names(url)
return tokens
[docs] def handle_annotation_list(self, line: str, position: int, tokens: ParseResults) -> ParseResults:
"""Handle statements like ``DEFINE ANNOTATION X AS LIST {"Y","Z", ...}``.
:raises: RedefinedAnnotationError
"""
annotation = tokens['name']
self.raise_for_redefined_annotation(line, position, annotation)
self.annotation_to_local[annotation] = set(tokens['values'])
return tokens
[docs] def handle_annotation_pattern(self, line: str, position: int, tokens: ParseResults) -> ParseResults:
"""Handle statements like ``DEFINE ANNOTATION X AS PATTERN "Y"``.
:raises: RedefinedAnnotationError
"""
annotation = tokens['name']
self.raise_for_redefined_annotation(line, position, annotation)
self.annotation_to_pattern[annotation] = re.compile(tokens['value'])
return tokens
[docs] def has_enumerated_annotation(self, annotation: str) -> bool:
"""Check if this annotation is defined by an enumeration."""
return annotation in self.annotation_to_term
[docs] def has_regex_annotation(self, annotation: str) -> bool:
"""Check if this annotation is defined by a regular expression."""
return annotation in self.annotation_to_pattern
[docs] def has_local_annotation(self, annotation: str) -> bool:
"""Check if this annotation is defined by an locally."""
return annotation in self.annotation_to_local
[docs] def has_annotation(self, annotation: str) -> bool:
"""Check if this annotation is defined."""
return (
self.has_enumerated_annotation(annotation)
or self.has_regex_annotation(annotation)
or self.has_local_annotation(annotation)
)
[docs] def has_enumerated_namespace(self, namespace: str) -> bool:
"""Check if this namespace is defined by an enumeration."""
return namespace in self.namespace_to_term_to_encoding
[docs] def has_regex_namespace(self, namespace: str) -> bool:
"""Check if this namespace is defined by a regular expression."""
return namespace in self.namespace_to_pattern
[docs] def has_namespace(self, namespace: str) -> bool:
"""Check if this namespace is defined."""
return self.has_enumerated_namespace(namespace) or self.has_regex_namespace(namespace)
[docs] def raise_for_version(self, line: str, position: int, version: str) -> None:
"""Check that a version string is valid for BEL documents.
This means it's either in the YYYYMMDD or semantic version format.
:param line: The line being parsed
:param position: The position in the line being parsed
:param str version: A version string
:raises: VersionFormatWarning
"""
if valid_date_version(version):
return
if not SEMANTIC_VERSION_STRING_RE.match(version):
raise VersionFormatWarning(self.get_line_number(), line, position, version)