Source code for timApp.celery_sqlalchemy_scheduler.session

"""SQLAlchemy session."""

from contextlib import contextmanager

from kombu.utils.compat import register_after_fork
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

from timApp.timdb.sqa import db

ModelBase = db.Model


[docs]@contextmanager def session_cleanup(session): try: yield except Exception: session.rollback() raise finally: session.close()
def _after_fork_cleanup_session(session): session._after_fork()
[docs]class SessionManager: """Manage SQLAlchemy sessions.""" def __init__(self): self._engines = {} self._sessions = {} self.forked = False self.prepared = False if register_after_fork is not None: register_after_fork(self, _after_fork_cleanup_session) def _after_fork(self): self.forked = True
[docs] def get_engine(self, dburi, **kwargs): if self.forked: try: return self._engines[dburi] except KeyError: engine = self._engines[dburi] = create_engine(dburi, **kwargs) return engine else: return create_engine(dburi, poolclass=NullPool)
[docs] def create_session(self, dburi, short_lived_sessions=False, **kwargs): engine = self.get_engine(dburi, **kwargs) if self.forked: if short_lived_sessions or dburi not in self._sessions: self._sessions[dburi] = sessionmaker(bind=engine) return engine, self._sessions[dburi] else: return engine, sessionmaker(bind=engine)
[docs] def prepare_models(self, engine): pass
[docs] def session_factory(self, dburi, **kwargs): engine, session = self.create_session(dburi, **kwargs) self.prepare_models(engine) return session()