To launch the pipeline:
$ python manage.py
It should show the help message of the pipeline. A description of the available options is done, and also for sub-commands provided by the pipeline or its plugins.
Since information provided by plugins are displayed when launching the
pipeline, some jobs have already done by the module poppy.pop
.
The workflow is as follows and is described more thoroughly in the developer guide:
The initialization of the pipeline object is divided in several parts. They are detailed below.
The context setup is an important part of the pipeline. This is an attribute containing all the variable necessary to the pipeline to setup its environment. But it is also a way for the tasks to share information between them across all the chain.
This attribute is called poppy.pop.pop.Pop.properties
, and is an
instance of poppy.core.properties.Properties
, a dictionary
like class whose attributes can also be accessed as dictionary key. The
existence of an attribute inside this context can be tested easily with a
simple in
operator.
All the arguments args
transmitted to the pipeline are set on the
context (properties) of the pipeline, thus allowing any connected task to
use settings from the environment or the user. This is done through:
self.properties = Properties()
self.properties += vars(args)
where vars
simply takes the attributes, store them into dictionary
and add them to the context.
From the information provided by the arguments now stored into the context, the pipeline can setup the connectors. A connector permits to link a database with an identifier to an unique connection object in the pipeline. This connector will remain the same in the code, but another database can be linked to it later if necessary.
The pipeline’s context is set as an attribute of the connector after its creation, allowing the connector to shortcut the pipeline when necessary, in order to access the information in the context for example.
The function poppy.core.db.database.link_databases()
loop over
defined databases, create the connector if not already created and set the
linked database as an attribute to the connector.
"databases": [
{
"identifier": "MAIN-DB",
"connector": "poppy.pop.roc_connector.ROC",
"login_info": {
...
}
}
]
identifier
is the identifier of the database that will be linked to
the connector and that will be used to make the necessary connections
along the program. It is also the identifier that is used as reference in the
code to get the associated connector.connector
is optional. If present, the class in the provided
module path will be used to construct the connector, else the default
poppy.db.connector.Connector
is used. It can be useful
to add other behaviour to a given connector.The dry run object poppy.core.db.dry_runner.DryRunner
, a
singleton in the program, is referenced by the pipeline to enable/disbale
the dry run mode in function of the settings used by the user.
The dry run object allows to enable/disable some functions, methods in the code at runtime, simply by decorating them. For example, you can decorate a method with the dry run object to not execute this method if the dry run mode is activated. This allows, for example, to not write things into the ROC database if this mode is activated, while keeping the other features of the POPPy framework intact.
The state of the dry run mode is setup according to the status of the
--dry-run
argument of python manage.py
.
self.dry_runner = DryRunner()
self.dry_run = args.dry_run
Here, the value in the CLI is present in args
and the dry_run
attribute set the state of the dry run mode through the setter.
@property
def dry_run(self):
return self._dry_run
@dry_run.setter
def dry_run(self, dry_run):
self._dry_run = dry_run
if dry_run:
self.dry_runner.activate()
else:
self.dry_runner.deactivate()
Linking a task chain to the pipeline instance is simple. Just create a pipe link between a task and the pipeline. Then the task can be linked (or already linked) to other tasks.
The pipeline will keep a reference to this task called the entry point to be able to walk through the graph formed by the tasks to run.
At each linking, a flag is set to indicate that the graph of tasks will have to be regenerated before the execution of the pipeline.
Important
In fact, the entry point task is stored into the pipeline but will not be
used. A flag is set to indicate that the chain has changed and need an
update. This the information from the poppy.pop.pop.Pop.start
that
gives really an entry point.
If a chain of tasks is already existing, and just a small part of it must be
executed, it can be useful to only run this part, without the extra-tasks.
This is why poppy.pop.pop.Pop.start
and poppy.pop.pop.Pop.end
attributes are existing. The poppy.pop.pop.Pop.start
attribute must
be set to indicate the starting point for the chain of tasks to execute. The
poppy.pop.pop.Pop.end
attribute is not mandatory. It can be set to
the end of the task if all do not have to be done inside the chain.
A loop feature gives the possibility to rerun according to an iterator a
part of the tasks chain. A loop can be created by calling the
poppy.pop.pop.Pop.loop()
method with the starting task, ending task and
the iterator that will be iterated to get the step of the loop.
An instance of poppy.pop.loop.Loop
will be created, that will override
the settings for ancestors and descendants of the start and end tasks
provided in arguments. This instance will connect to the
poppy.pop.task.Task.errored
signal, emitted each time that an error
occurred in a task inside the loop chain, to handle correctly an error while in
the loop.
Note
It will not connect to task outside the path(s) between the start and end task of the loop.
Start and end tasks will also be monkey patched appropriately to handle errors occurring on these tasks and also the end of the loop. The following flowchart in Fig. 3 gives an idea on what have to be done on each signal emitted by tasks.
The flowchart takes the example of 6 tasks A, B, C, D, E, F in this order of
execution, whose tasks B, C, D, E are inside a loop. When setting the start
task, the loop instance will connect to the started
signal of task B.
The same is done for the ended
signal of end task E. All others tasks
in the path(s) between B and E are connected to their errored
signal,
emitted when an error happened on the task.
If an error happens on task C or the end of an iteration is reached in E, the connected slot of the loop^instance is called. The following steps are executed:
StopIteration
exception)
from the given iterable.With this process of dynamic change of the topology of the pipeline inside a loop, no need to integrate it in the main pipeline process. The pipeline works as usual, it is just an other instances that take care of what is happening inside the task chain.
The first thing to do before running the pipeline is to create the dependency graph of the task chain. From the starting task, the pipeline goes through the tasks in chain and create the graph of the tasks, that will be used to find paths between tasks. At the same time, the pipeline adds its reference into each task, allowing them to access to the context, and any other data provided by the pipeline.
If the topology has already been created (the flag is set), the topology is not created again.
Then, the pipeline binds the ROC connector Since the ORM uses the reflection
system to create the class doing the mapping with the database, we need to
indicate to sqlalchemy
at which moment to check in th database for
creating the mapping. This is done at the
poppy.core.db.connector.Connector.bind()
method. This will try to
get the linked database object, create the mapping classes with the
databases with reflection if not already done, and then attempt a connection
with the database.
The pipeline do this automatically at startup to not let the user do it, avoiding incomprehensible error message if not binded. The pipeline does it only for its own connector. Other connectors are not binded by the ROC pipeline.
The execution of the pipeline simply consists in running all dependencies of a task before itself and its children. This can normally be done recursively. But since a loop feature as been introduced, this can create situations where the maximal recursion depth limit of python has been reached. This way another approach with queue has been implemented.
A while
loop is executed until the queue becomes empty. A task is
popped from the queue. If the task is already completed or failed, the next
iteration is performed, and a new task is popped. If task dependencies
(parents) are not already executed, they are added into the queue. If not all
its dependencies are completed, the loop continues.
If all dependencies are done, the task itself is executed. Then all its children are added into the queue to be run themselves.