Source code for poppy.core.db.handlers

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


from poppy.core.db.connector import Connector
from poppy.core.db.database import Database
from poppy.core.logger import logger
from poppy.core.tools.imports import import_class
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound

__all__ = [
    "load_databases",
    "link_databases",
    "yield_limit",
    "connect_databases",
    "get_or_create",
    "get_or_create_with_info",
    "create", "update_one",
]


[docs]def yield_limit(qry, pk_attr, maxrq=1000): """ Specialized windowed query generator (using LIMIT/OFFSET) This recipe is to select through a large number of rows thats too large to fetch at once. The technique depends on the primary key of the FROM clause being an integer value, and selects items using LIMIT. """ firstid = None while True: q = qry if firstid is not None: q = qry.filter(pk_attr > firstid) rec = None for rec in q.order_by(pk_attr).limit(maxrq): yield rec if rec is None: break firstid = pk_attr.__get__(rec, pk_attr) if rec else None
[docs]def load_databases(databases): """ Load databases according to the information provided in the dictionary in argument. """ # loop over parameters in the argument for database in databases: # create the database name = database["identifier"] logger.debug("Instantiate the database {0}".format(name)) db = Database(name) # set the base used to reflect the database if database["identifier"] in Database.bases_manager.availables: db.base = Database.bases_manager.availables[database["identifier"]] else: logger.info("Base of database {0} isn't registered".format( database["identifier"])) continue # update the parameters used by the database db.parameters = database["login_info"]
[docs]def connect_databases(): """ Make a connection and reflection of databases registered. """ # loop over databases instances and make the connection for database in Database.manager.instances: # connect to the database database.connectDatabase() # if the database is not connected, display it to the user if not database.connected: logger.error("{0} is not connected".format(database))
[docs]def get_or_create( session, model, create_method="", create_method_kwargs=None, **kwargs ): """ Simply get an object if already present in the database or create it in the other case. See http://skien.cc/blog/2014/01/15/sqlalchemy-and-race-conditions-implementing/ and http://skien.cc/blog/2014/02/06/sqlalchemy-and-race-conditions-follow-up/ for better details on why this function as been upgraded to the provided example. Better handling of weird cases in the situation of multiple processes using the database at the same time. """ try: return session.query(model).filter_by(**kwargs).one() except NoResultFound: kwargs.update(create_method_kwargs or {}) created = getattr(model, create_method, model)(**kwargs) try: session.add(created) session.commit() return created except IntegrityError: session.rollback() return session.query(model).filter_by(**kwargs).one()
def get_update_or_create( session, model, create_method="", update_fields=None, **kwargs ): """ Get an object if already present in the database and update it. If not, create it """ try: # get the obj instance instance = session.query(model).filter_by(**kwargs).one() # update it for field in update_fields or {}: setattr(instance, field, update_fields[field]) session.commit() return instance except NoResultFound: # create the obj instance kwargs.update(update_fields or {}) created = getattr(model, create_method, model)(**kwargs) try: session.add(created) session.commit() return created except IntegrityError: session.rollback() return session.query(model).filter_by(**kwargs).one()
[docs]def get_or_create_with_info( session, model, create_method="", create_method_kwargs=None, **kwargs ): """ Simply get an object if already present in the database or create it in the other case. See http://skien.cc/blog/2014/01/15/sqlalchemy-and-race-conditions-implementing/ and http://skien.cc/blog/2014/02/06/sqlalchemy-and-race-conditions-follow-up/ for better details on why this function as been upgraded to the provided example. Better handling of weird cases in the situation of multiple processes using the database at the same time. """ try: return session.query(model).filter_by(**kwargs).one(), False except NoResultFound: kwargs.update(create_method_kwargs or {}) created = getattr(model, create_method, model)(**kwargs) try: session.add(created) session.commit() return created, True except IntegrityError: session.rollback() return session.query(model).filter_by(**kwargs).one(), False
[docs]def create(session, model, **kwargs): """ Simply create an object and add it into the database. """ # create it and add it instance = model(**kwargs) session.add(instance) session.commit() return instance
[docs]def update_one( session, model, values, **kwargs ): """Update column values of a single entry in the database.""" # Check if the entry already exists... try: entry = session.query(model).filter_by(**kwargs).one() except NoResultFound: # if not return false logger.error("NO ENTRY FOUND IN THE DATABASE FOR ({0})!".format(model)) raise(NoResultFound) except MultipleResultsFound: # if multiple entries get the first one logger.warning( "Multiple entries found in the database ({0})!".format(model)) entry = session.query(model).filter_by(**kwargs).first() # Else update column values for the existing entry for col in values.keys(): exec("entry.{0} = values['{0}']".format(col)) try: session.commit() except IntegrityError: session.rollback() return False