Source code for poppy.core.db.connector

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from poppy.core.logger import logger

from functools import wraps
from poppy.core.generic.metaclasses import SingletonManager
from poppy.core.db.database import Database
from poppy.core.db.dry_runner import DryRunner



__all__ = ["Connector"]


class ConnectorNotAvailable(Exception):
    pass


[docs]class Connector(object, metaclass=SingletonManager): """ Base class for managing sessions with various databases. """ def __init__(self, name, database=None): """ Init a connector with useful information. """ # store the name of the connector, useful to reference it later self.name = name # init the session to a None value to see if already in use or not self.session = None # if a database is given, set it if database is not None: self.database = database @DryRunner.dry_run
[docs] def bind(self): """ To bind the database to the models of the ORM. """ # get the database database = self.get_database() # and connect database.connectDatabase() # check the connection is good if not database.connected: message = "{0} is not connected".format(database) logger.error(message) raise Exception(message) # check that a session is already in use if self.session is not None: # close the session try: self.session.close() except Exception as e: logger.error(e) raise e # get a scoped session to be synchronized with others self.session = self.factory(database)
[docs] def rollback(self): """ Undo what has been done in the current transaction. """ self.session.rollback()
@DryRunner.dry_run
[docs] def update_database(self): """ Commit all changes for the current session. """ self.session.commit()
@DryRunner.dry_run
[docs] def flush(self): """ Flush the session, i.e. it transfers python objects to the transaction buffer of the database, thus the memory of the session is cleaned up. But if an error occurred, the transaction is rolled back. """ self.session.flush()
@DryRunner.dry_run
[docs] def add(self, obj): """ Add an object into the session. A simple wrapper around the session. """ # add the object into the shared session self.session.add(obj)
[docs] def factory(self, database): """ Given the database, return the session factory. """ return database.session_factory()
@property def database(self): return self._database @database.setter def database(self, database): # store database in use self._database = database
[docs] def get_database(self): """ Return the database object. """ return Database.manager[self._database]
@classmethod
[docs] def if_connected(cls, *args): """ A decorator to execute a function or method only if the database named name is connected. """ def decorator(func): """ Return the function that will be used to decorate. """ @wraps(func) def wrapper(*fargs, **kwargs): """ The function that will check the connection and run the wrapped function if the database is available. """ # loop over names for name in args: # get the connector instance from the given name if name not in cls.manager: message = "Connector {0} not available".format(name) logger.error(message) raise ConnectorNotAvailable(message) # get the connector connector = cls.manager[name] # get the database database = connector.get_database() # check the connection if not database.is_available_with_error(): message = "No connection for {0}".format(database) return None # run the wrapped function as usual return func(*fargs, **kwargs) return wrapper return decorator
def __repr__(self): """ A better representation of a connector. """ return "Connector {0}".format(self.name)