Pipeline

Usage

To launch the pipeline:

$ python manage.py

It should show the help message of the pipeline. A description of the available options is done, and also for sub-commands provided by the pipeline or its plugins.

Since information provided by plugins are displayed when launching the pipeline, some jobs have already done by the module poppy.pop.

The workflow is as follows and is described more thoroughly in the developer guide:

  1. Creation of the pipeline object. It uses the arguments provided (usually from the CLI) in order to initialize correctly the databases.
  2. Link of the tasks (chain) with the pipeline.
  3. Run the pipeline with the provided tasks.
../../_images/pipeline_setup.svg

Fig. 2 Steps of the pipeline setup.

Initialization

The initialization of the pipeline object is divided in several parts. They are detailed below.

Context setup

The context setup is an important part of the pipeline. This is an attribute containing all the variable necessary to the pipeline to setup its environment. But it is also a way for the tasks to share information between them across all the chain.

This attribute is called poppy.pop.pop.Pop.properties, and is an instance of poppy.core.properties.Properties, a dictionary like class whose attributes can also be accessed as dictionary key. The existence of an attribute inside this context can be tested easily with a simple in operator.

All the arguments args transmitted to the pipeline are set on the context (properties) of the pipeline, thus allowing any connected task to use settings from the environment or the user. This is done through:

self.properties = Properties()
self.properties += vars(args)

where vars simply takes the attributes, store them into dictionary and add them to the context.

Connector setup

From the information provided by the arguments now stored into the context, the pipeline can setup the connectors. A connector permits to link a database with an identifier to an unique connection object in the pipeline. This connector will remain the same in the code, but another database can be linked to it later if necessary.

The pipeline’s context is set as an attribute of the connector after its creation, allowing the connector to shortcut the pipeline when necessary, in order to access the information in the context for example.

The function poppy.core.db.database.link_databases() loop over defined databases, create the connector if not already created and set the linked database as an attribute to the connector.

"databases": [
    {
        "identifier": "MAIN-DB",
        "connector": "poppy.pop.roc_connector.ROC",
        "login_info": {
            ...
        }
    }
]
  • identifier is the identifier of the database that will be linked to the connector and that will be used to make the necessary connections along the program. It is also the identifier that is used as reference in the code to get the associated connector.
  • connector is optional. If present, the class in the provided module path will be used to construct the connector, else the default poppy.db.connector.Connector is used. It can be useful to add other behaviour to a given connector.

Dry run setup

The dry run object poppy.core.db.dry_runner.DryRunner, a singleton in the program, is referenced by the pipeline to enable/disbale the dry run mode in function of the settings used by the user.

The dry run object allows to enable/disable some functions, methods in the code at runtime, simply by decorating them. For example, you can decorate a method with the dry run object to not execute this method if the dry run mode is activated. This allows, for example, to not write things into the ROC database if this mode is activated, while keeping the other features of the POPPy framework intact.

The state of the dry run mode is setup according to the status of the --dry-run argument of python manage.py.

self.dry_runner = DryRunner()
self.dry_run = args.dry_run

Here, the value in the CLI is present in args and the dry_run attribute set the state of the dry run mode through the setter.

@property
def dry_run(self):
    return self._dry_run

@dry_run.setter
def dry_run(self, dry_run):
    self._dry_run = dry_run
    if dry_run:
        self.dry_runner.activate()
    else:
        self.dry_runner.deactivate()

Task chain

Linking

Linking a task chain to the pipeline instance is simple. Just create a pipe link between a task and the pipeline. Then the task can be linked (or already linked) to other tasks.

The pipeline will keep a reference to this task called the entry point to be able to walk through the graph formed by the tasks to run.

At each linking, a flag is set to indicate that the graph of tasks will have to be regenerated before the execution of the pipeline.

Important

In fact, the entry point task is stored into the pipeline but will not be used. A flag is set to indicate that the chain has changed and need an update. This the information from the poppy.pop.pop.Pop.start that gives really an entry point.

Cutting chain

If a chain of tasks is already existing, and just a small part of it must be executed, it can be useful to only run this part, without the extra-tasks. This is why poppy.pop.pop.Pop.start and poppy.pop.pop.Pop.end attributes are existing. The poppy.pop.pop.Pop.start attribute must be set to indicate the starting point for the chain of tasks to execute. The poppy.pop.pop.Pop.end attribute is not mandatory. It can be set to the end of the task if all do not have to be done inside the chain.

Loop

A loop feature gives the possibility to rerun according to an iterator a part of the tasks chain. A loop can be created by calling the poppy.pop.pop.Pop.loop() method with the starting task, ending task and the iterator that will be iterated to get the step of the loop.

An instance of poppy.pop.loop.Loop will be created, that will override the settings for ancestors and descendants of the start and end tasks provided in arguments. This instance will connect to the poppy.pop.task.Task.errored signal, emitted each time that an error occurred in a task inside the loop chain, to handle correctly an error while in the loop.

Note

It will not connect to task outside the path(s) between the start and end task of the loop.

Start and end tasks will also be monkey patched appropriately to handle errors occurring on these tasks and also the end of the loop. The following flowchart in Fig. 3 gives an idea on what have to be done on each signal emitted by tasks.

../../_images/pipeline_loop.svg

Fig. 3 Flowchart for the loop of the pipeline for a chain of 6 tasks. Tasks B, C, D, E are placed inside a loop. The loop instance changes the descendants of tasks dynamically in function of the status of tasks inside the loop.

The flowchart takes the example of 6 tasks A, B, C, D, E, F in this order of execution, whose tasks B, C, D, E are inside a loop. When setting the start task, the loop instance will connect to the started signal of task B. The same is done for the ended signal of end task E. All others tasks in the path(s) between B and E are connected to their errored signal, emitted when an error happened on the task.

If an error happens on task C or the end of an iteration is reached in E, the connected slot of the loop^instance is called. The following steps are executed:

  1. check if the loop can continue or not (StopIteration exception) from the given iterable.
  2. YES: monkey patch descendants of the end/error task to point to the start task.
  3. Reset status of tasks inside the loop.
  4. keep a reference to the end/error task.
  5. NO: there is no more iteration to do for the loop. Disconnect start task from the slot of the loop instance.
  6. also changes the descendants of the task to the one of the end task. Thus, following tasks not in a loop are executed as usual.
  7. At next iteration, if a task is found for the reference task, its descendants are again set to the original method to get them.

With this process of dynamic change of the topology of the pipeline inside a loop, no need to integrate it in the main pipeline process. The pipeline works as usual, it is just an other instances that take care of what is happening inside the task chain.

Run

Topology generation

The first thing to do before running the pipeline is to create the dependency graph of the task chain. From the starting task, the pipeline goes through the tasks in chain and create the graph of the tasks, that will be used to find paths between tasks. At the same time, the pipeline adds its reference into each task, allowing them to access to the context, and any other data provided by the pipeline.

If the topology has already been created (the flag is set), the topology is not created again.

Binding

Then, the pipeline binds the ROC connector Since the ORM uses the reflection system to create the class doing the mapping with the database, we need to indicate to sqlalchemy at which moment to check in th database for creating the mapping. This is done at the poppy.core.db.connector.Connector.bind() method. This will try to get the linked database object, create the mapping classes with the databases with reflection if not already done, and then attempt a connection with the database.

The pipeline do this automatically at startup to not let the user do it, avoiding incomprehensible error message if not binded. The pipeline does it only for its own connector. Other connectors are not binded by the ROC pipeline.

Execution

The execution of the pipeline simply consists in running all dependencies of a task before itself and its children. This can normally be done recursively. But since a loop feature as been introduced, this can create situations where the maximal recursion depth limit of python has been reached. This way another approach with queue has been implemented.

A while loop is executed until the queue becomes empty. A task is popped from the queue. If the task is already completed or failed, the next iteration is performed, and a new task is popped. If task dependencies (parents) are not already executed, they are added into the queue. If not all its dependencies are completed, the loop continues.

If all dependencies are done, the task itself is executed. Then all its children are added into the queue to be run themselves.

API