# -*- coding: utf-8 -*-
"""This module contains the base class for connection managers in SQLAlchemy."""
import logging
from typing import List, Optional, Tuple, Type, TypeVar
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from .models import Base
from ..config import config
__all__ = [
'BaseManager',
'build_engine_session',
]
logger = logging.getLogger(__name__)
X = TypeVar('X')
def build_engine_session(
connection: str,
echo: bool = False,
autoflush: Optional[bool] = None,
autocommit: Optional[bool] = None,
expire_on_commit: Optional[bool] = None,
scopefunc=None,
) -> Tuple:
"""Build an engine and a session.
:param connection: An RFC-1738 database connection string
:param echo: Turn on echoing SQL
:param autoflush: Defaults to True if not specified in kwargs or configuration.
:param autocommit: Defaults to False if not specified in kwargs or configuration.
:param expire_on_commit: Defaults to False if not specified in kwargs or configuration.
:param scopefunc: Scoped function to pass to :func:`sqlalchemy.orm.scoped_session`
:rtype: tuple[Engine,Session]
From the Flask-SQLAlchemy documentation:
An extra key ``'scopefunc'`` can be set on the ``options`` dict to
specify a custom scope function. If it's not provided, Flask's app
context stack identity is used. This will ensure that sessions are
created and removed with the request/response cycle, and should be fine
in most cases.
"""
if connection is None:
raise ValueError('can not build engine when connection is None')
engine = create_engine(connection, echo=echo)
if autoflush is None:
autoflush = config.get('PYBEL_MANAGER_AUTOFLUSH', True)
if autocommit is None:
autocommit = config.get('PYBEL_MANAGER_AUTOCOMMIT', False)
if expire_on_commit is None:
expire_on_commit = config.get('PYBEL_MANAGER_AUTOEXPIRE', True)
logger.debug('auto flush: %s, auto commit: %s, expire on commmit: %s', autoflush, autocommit, expire_on_commit)
#: A SQLAlchemy session maker
session_maker = sessionmaker(
bind=engine,
autoflush=autoflush,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
)
#: A SQLAlchemy session object
session = scoped_session(
session_maker,
scopefunc=scopefunc,
)
return engine, session
[docs]class BaseManager(object):
"""A wrapper around a SQLAlchemy engine and session."""
#: The declarative base for this manager
base = Base
def __init__(self, engine, session) -> None:
"""Instantiate a manager from an engine and session."""
self.engine = engine
self.session = session
[docs] def create_all(self, checkfirst: bool = True) -> None:
"""Create the PyBEL cache's database and tables.
:param checkfirst: Check if the database exists before trying to re-make it
"""
self.base.metadata.create_all(bind=self.engine, checkfirst=checkfirst)
[docs] def drop_all(self, checkfirst: bool = True) -> None:
"""Drop all data, tables, and databases for the PyBEL cache.
:param checkfirst: Check if the database exists before trying to drop it
"""
self.session.close()
self.base.metadata.drop_all(bind=self.engine, checkfirst=checkfirst)
[docs] def bind(self) -> None:
"""Bind the metadata to the engine and session."""
self.base.metadata.bind = self.engine
self.base.query = self.session.query_property()
def _list_model(self, model_cls: Type[X]) -> List[X]:
"""List the models in this class."""
return self.session.query(model_cls).all()
def _count_model(self, model_cls) -> int:
"""Count the number of models in the database."""
return self.session.query(model_cls).count()
def __repr__(self):
return '<{} connection={}>'.format(self.__class__.__name__, self.engine.url)