Source code for pybel.manager.citation_utils

# -*- coding: utf-8 -*-

"""Citation utilities for the database manager."""

import logging
import re
from datetime import date, datetime
from functools import lru_cache
from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Tuple, Union

import ratelimit
import requests
from more_itertools import chunked
from sqlalchemy import and_
from tqdm.autonotebook import tqdm

from . import models
from .cache_manager import Manager
from ..constants import CITATION
from ..struct.filters import filter_edges
from ..struct.filters.edge_predicates import CITATION_PREDICATES
from ..struct.graph import BELGraph
from ..struct.summary.provenance import get_citation_identifiers

__all__ = [
    "enrich_pubmed_citations",
    "enrich_pmc_citations",
]

logger = logging.getLogger(__name__)

EUTILS_URL_FMT = "http://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=pubmed&retmode=json&id={}"

re1 = re.compile(r"^[12][0-9]{3} [a-zA-Z]{3} \d{1,2}$")
re2 = re.compile(r"^[12][0-9]{3} [a-zA-Z]{3}$")
re3 = re.compile(r"^[12][0-9]{3}$")
re4 = re.compile(r"^[12][0-9]{3} [a-zA-Z]{3}-[a-zA-Z]{3}$")
re5 = re.compile(r"^([12][0-9]{3}) (Spring|Fall|Winter|Summer)$")
re6 = re.compile(r"^[12][0-9]{3} [a-zA-Z]{3} \d{1,2}-(\d{1,2})$")
re7 = re.compile(r"^[12][0-9]{3} [a-zA-Z]{3} \d{1,2}-([a-zA-Z]{3} \d{1,2})$")

# TODO "Winter 2016" probably with re.compile(r'^(Spring|Fall|Winter|Summer) ([12][0-9]{3})$')
# TODO "YYYY Oct - Dec" update re4 to allow spaces before and after the dash

season_map = {"Spring": "03", "Summer": "06", "Fall": "09", "Winter": "12"}


def sanitize_date(publication_date: str) -> str:
    """Sanitize lots of different date strings into ISO-8601."""
    if re1.search(publication_date):
        return datetime.strptime(publication_date, "%Y %b %d").strftime("%Y-%m-%d")

    if re2.search(publication_date):
        return datetime.strptime(publication_date, "%Y %b").strftime("%Y-%m-01")

    if re3.search(publication_date):
        return publication_date + "-01-01"

    if re4.search(publication_date):
        return datetime.strptime(publication_date[:-4], "%Y %b").strftime("%Y-%m-01")

    s = re5.search(publication_date)

    if s:
        year, season = s.groups()
        return "{}-{}-01".format(year, season_map[season])

    s = re6.search(publication_date)

    if s:
        return datetime.strptime(publication_date, "%Y %b %d-{}".format(s.groups()[0])).strftime("%Y-%m-%d")

    s = re7.search(publication_date)

    if s:
        return datetime.strptime(publication_date, "%Y %b %d-{}".format(s.groups()[0])).strftime("%Y-%m-%d")


def clean_pubmed_identifiers(identifiers: Iterable[str]) -> List[str]:
    """Clean a list of identifiers with string strips, deduplicates, and sorting."""
    _identifiers = (str(identifier).strip() for identifier in identifiers if identifier)
    return sorted({i for i in _identifiers if i})


@ratelimit.limits(calls=3, period=1)
def get_pubmed_citation_response(pubmed_identifiers: Iterable[str]):
    """Get the response from PubMed E-Utils for a given list of PubMed identifiers.

    Rate limit of 3 requests per second is from:
    https://ncbiinsights.ncbi.nlm.nih.gov/2018/08/14/release-plan-for-e-utility-api-keys/

    :param pubmed_identifiers:
    :rtype: dict
    """
    pubmed_identifiers = list(pubmed_identifiers)
    url = EUTILS_URL_FMT.format(
        ",".join(pubmed_identifier for pubmed_identifier in pubmed_identifiers if pubmed_identifier),
    )
    response = requests.get(url)
    return response.json()


def enrich_citation_model(manager: Manager, citation: models.Citation, p: Mapping[str, Any]) -> bool:
    """Enrich a citation model with the information from PubMed.

    :param manager: A database manager
    :param citation: A citation model
    :param p: The dictionary from PubMed E-Utils corresponding to d["result"][pmid]
    """
    if "error" in p:
        logger.warning("Error downloading PubMed")
        return False

    citation.title = p["title"]
    citation.journal = p["fulljournalname"]
    citation.volume = p["volume"]
    citation.issue = p["issue"]
    citation.pages = p["pages"]
    citation.first = manager.get_or_create_author(p["sortfirstauthor"])
    citation.last = manager.get_or_create_author(p["lastauthor"])
    pubtypes = p["pubtype"]
    if pubtypes:
        citation.article_type = pubtypes[0]

    if "authors" in p:
        for author in p["authors"]:
            author_model = manager.get_or_create_author(author["name"])
            if author_model not in citation.authors:
                citation.authors.append(author_model)

    publication_date = p["pubdate"]
    try:
        sanitized_publication_date = sanitize_date(publication_date)
    except ValueError:
        logger.warning(
            "could not parse publication date %s for pubmed:%s",
            publication_date,
            citation.db_id,
        )
        sanitized_publication_date = None

    if sanitized_publication_date:
        citation.date = datetime.strptime(sanitized_publication_date, "%Y-%m-%d")
    else:
        logger.info("result had date with strange format: %s", publication_date)

    return True


def get_citations_by_pmids(
    manager: Manager,
    pmids: Iterable[Union[str, int]],
    *,
    group_size: Optional[int] = None,
    offline: bool = False,
) -> Tuple[Dict[str, Dict], Set[str]]:
    return _get_citations_by_identifiers(
        manager=manager,
        identifiers=pmids,
        group_size=group_size,
        offline=offline,
        prefix="pubmed",
    )


def _get_citations_by_identifiers(
    manager: Manager,
    identifiers: Iterable[Union[str, int]],
    *,
    group_size: Optional[int] = None,
    offline: bool = False,
    prefix: Optional[str] = None,
) -> Tuple[Dict[str, Dict], Set[str]]:
    """Get citation information for the given list of PubMed identifiers using the NCBI's eUtils service.

    :type manager: pybel.Manager
    :param identifiers: an iterable of PubMed identifiers
    :param group_size: The number of PubMed identifiers to query at a time. Defaults to 200 identifiers.
    :return: A dictionary of {identifier: data dictionary} or a pair of this dictionary and a set ot erroneous
             identifiers.
    """
    if prefix is None:
        prefix = "pubmed"

    helper = _HELPERS.get(prefix)
    if helper is None:
        raise ValueError(f"can not work on prefix: {prefix}")

    group_size = group_size if group_size is not None else 200

    identifiers = clean_pubmed_identifiers(identifiers)
    logger.info("ensuring %d %s identifiers", len(identifiers), prefix)

    enriched_models = {}
    unenriched_models = {}

    id_to_model = {
        citation_model.db_id: citation_model
        for citation_model in _get_citation_models(identifiers, prefix=prefix, manager=manager)
    }
    logger.info(
        "%d of %d %s identifiers are already cached",
        len(id_to_model),
        len(identifiers),
        prefix,
    )
    for identifier in tqdm(identifiers, desc=f"creating {prefix} models"):
        model = id_to_model.get(identifier)
        if model is None:
            model = id_to_model[identifier] = manager.get_or_create_citation(identifier=identifier, namespace=prefix)
        if model.is_enriched:
            enriched_models[identifier] = model.to_json()
        else:
            unenriched_models[identifier] = model

    logger.info(
        "%d of %d %s are identifiers already enriched",
        len(enriched_models),
        len(identifiers),
        prefix,
    )
    manager.session.commit()

    errors = set()
    if not unenriched_models or offline:
        return enriched_models, errors

    it = tqdm(unenriched_models, desc=f"getting {prefix} data in chunks of {group_size}")
    for identifier_chunk in chunked(it, n=group_size):
        helper(
            identifier_chunk,
            manager=manager,
            enriched_models=enriched_models,
            unenriched_models=unenriched_models,
            errors=errors,
        )

    return enriched_models, errors


def _help_enrich_pmids(identifiers: Iterable[str], *, manager, unenriched_models, enriched_models, errors):
    response = get_pubmed_citation_response(identifiers)
    response_pmids = response["result"]["uids"]

    for pmid in response_pmids:
        p = response["result"][pmid]
        citation = unenriched_models.get(pmid)
        if citation is None:
            tqdm.write(f"problem looking up pubmed:{pmid}")
            continue

        successful_enrichment = enrich_citation_model(manager, citation, p)

        if not successful_enrichment:
            tqdm.write(f"Error downloading pubmed:{pmid}")
            errors.add(pmid)
            continue

        enriched_models[pmid] = citation.to_json()
        manager.session.add(citation)

    manager.session.commit()  # commit in groups


def _help_enrich_pmc_identifiers(
    identifiers: Iterable[str],
    *,
    manager: Manager,
    unenriched_models,
    enriched_models,
    errors,
):
    for pmcid in identifiers:
        try:
            csl = get_pmc_csl_item(pmcid)
        except Exception:
            tqdm.write(f"Error downloading pmc:{pmcid}")
            errors.add(pmcid)
            continue

        model = unenriched_models[pmcid]
        enrich_citation_model_from_pmc(manager=manager, citation=model, csl=csl)
        manager.session.add(model)
        enriched_models[pmcid] = model.to_json()

    manager.session.commit()  # commit in groups


_HELPERS = {
    "pubmed": _help_enrich_pmids,
    "pmc": _help_enrich_pmc_identifiers,
}


def _get_citation_models(
    identifiers: Iterable[str],
    *,
    prefix: str,
    manager: Manager,
    chunksize: int = 200,
) -> Iterable[models.Citation]:
    for identifiers_chunk in chunked(identifiers, chunksize):
        citation_filter = and_(
            models.Citation.db == prefix,
            models.Citation.db_id.in_(identifiers_chunk),
        )
        yield from manager.session.query(models.Citation).filter(citation_filter).all()


[docs]def enrich_pubmed_citations( graph: BELGraph, *, manager: Optional[Manager] = None, group_size: Optional[int] = None, offline: bool = False, ) -> Set[str]: """Overwrite all PubMed citations with values from NCBI's eUtils lookup service. :param graph: A BEL graph :param manager: A PyBEL database manager :param group_size: The number of PubMed identifiers to query at a time. Defaults to 200 identifiers. :param offline: An override for when you don't want to hit the eUtils :return: A set of PMIDs for which the eUtils service crashed """ return _enrich_citations( manager=manager, graph=graph, group_size=group_size, offline=offline, prefix="pubmed", )
[docs]def enrich_pmc_citations( graph: BELGraph, *, manager: Optional[Manager] = None, group_size: Optional[int] = None, offline: bool = False, ) -> Set[str]: """Overwrite all PubMed citations with values from NCBI's eUtils lookup service. :param graph: A BEL graph :param manager: A PyBEL database manager :param group_size: The number of PubMed identifiers to query at a time. Defaults to 200 identifiers. :param offline: An override for when you don't want to hit the eUtils :return: A set of PMIDs for which the eUtils service crashed """ return _enrich_citations( manager=manager, graph=graph, group_size=group_size, offline=offline, prefix="pmc", )
def _enrich_citations( graph: BELGraph, manager: Optional[Manager], group_size: Optional[int] = None, offline: bool = False, prefix: Optional[str] = None, ) -> Set[str]: """Overwrite all citations of the given prefix using the predefined lookup functions. :param graph: A BEL Graph :param group_size: The number of identifiers to query at a time. Defaults to 200 identifiers. :return: A set of identifiers for which lookup was not possible """ if manager is None: manager = Manager() if prefix is None: prefix = "pubmed" identifiers = {identifier for identifier in get_citation_identifiers(graph, prefix) if identifier} identifier_map, errors = _get_citations_by_identifiers( manager, identifiers=identifiers, group_size=group_size, offline=offline, prefix=prefix, ) for u, v, k in filter_edges(graph, CITATION_PREDICATES[prefix]): identifier = graph[u][v][k][CITATION].identifier identifier_data = identifier_map.get(identifier) if identifier_data is None: logger.warning("Missing data for %s:%s", prefix, identifier) errors.add(identifier) continue graph[u][v][k][CITATION].update(identifier_data) return errors @lru_cache() def get_pmc_csl_item(pmcid: str) -> Mapping[str, Any]: """Get the CSL Item for a PubMed Central record by its PMID, PMCID, or DOI, using the NCBI Citation Exporter API.""" if not pmcid.startswith("PMC"): raise ValueError(f"not a valid pmd id: {pmcid}") from manubot.cite.pubmed import get_pmc_csl_item csl_item = get_pmc_csl_item(pmcid) if "URL" not in csl_item: csl_item["URL"] = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{csl_item.get('PMCID', pmcid)}/" return csl_item def enrich_citation_model_from_pmc(manager: Manager, citation: models.Citation, csl: Mapping[str, Any]) -> bool: """Enrich a citation model with the information from PubMed Central. :param manager: A database manager :param citation: A citation model :param dict csl: The dictionary from PMC """ citation.title = csl.get("title") citation.journal = csl.get("container-title") citation.volume = csl.get("volume") # citation.issue = csl['issue'] citation.pages = csl.get("page") citation.article_type = csl.get("type") for author in csl.get("author", []): try: author_name = f'{author["given"]} {author["family"]}' except KeyError: print(f"problem with author in pmc:{citation.db_id}", author) continue author_model = manager.get_or_create_author(author_name) if author_model not in citation.authors: citation.authors.append(author_model) if citation.authors: citation.first = citation.authors[0] citation.last = citation.authors[-1] issued = csl.get("issued") if issued is not None: date_parts = issued["date-parts"][0] if len(date_parts) == 3: citation.date = date(year=date_parts[0], month=date_parts[1], day=date_parts[2]) elif len(date_parts) == 2: citation.date = date(year=date_parts[0], month=date_parts[1], day=1) elif len(date_parts) == 1: citation.date = date(year=date_parts[0], month=1, day=1) else: logger.warning("not sure about date parts: %s", date_parts) return True