Tasks

Tasks are elemental bricks of the pipeline. The pipeline is composed of a succession of tasks, linked together by dependencies to each other tasks. Thus, the pipeline is just the topology resulting of the dependencies of tasks.

A task is just a succession of instructions to run, with its inputs and outputs provided and/or used by the other tasks. A task can be launched if its dependencies are complete and if the required inputs presents.

Tasks can communicate between them by sharing the inputs and outputs they use through the pipeline, and/or via properties stored also by the pipeline. So, the pipeline can be seen as a manager of tasks, regulating the communication between them and checking that they are completed before starting a new task with dependencies.

There are three ways of creating a task, described in sections below.

Task creation

Using class

To create a task, you can simply derive from the base class poppy.pop.Task. At the instantiation, the task needs some information on the kind of jobs it will perform, such as the software related to the task, the category of the task and a description to be able to have information on the task in the database simply by looking at it.

To be executable, the task must provide a run() method, taking no arguments. This the main of the task. The work to perform must be done in this method.

For example, to create a task that displays Hello world!:

from poppy.pop import Task

class HelloTask(Task):
    """
    Example task printing a message on the terminal.
    """
    def run(self):
        """
        The method launched by the pipeline to start the task.
        """
        print("Hello World!")

Then to instantiate the task, you will have to do:

task = HelloTask("Software category", "Task category", "A description")

The first argument is the software category, in other words the software to which the task is linked (it must be one valid for the current version of the POPPy framework, available in the descriptor file, see TODO for details). The second argument is the category of the task, it must be one accepted by the ROC database.

Note

If you have to create multiple similar tasks, it can be constraining to always specify the same arguments at their creation. You can also create a new task category and specify the arguments once at the instantiation by overriding the __init__() method.

class HelloTask(Task):
    """
    Example task printing a message on the terminal.
    """
    def __init__(self):
        """
        Override the parameters to put at each instantiation.
        """
        super(HelloTask, self).__init__(
            "Software category",
            "Task category",
            "A description",
        )

    def run(self):
        """
        The method launched by the pipeline to start the task.
        """
        print("Hello World!")

and then you can simply do:

task = HelloTask()

for each task of this kind that you want to create.

Warning

You should note that the created task in the example above is not already linked to the pipeline. This will be described in the dedicated section.

Using function

Sometimes, writing a class for each task can be a little annoying, and writing a function faster. So, the Task provides a class method to be able to decorate a function and transform it into a task.

The last example can be rewritten:

@Task.as_task
def HelloTask(task):
    print("Hello World!")

Much more compact! But you still have to pass mandatory parameters at the instantiation of the task. So you can combine the best of both worlds, by declaring a task with fixed parameters, and use it decorate many other functions to create other tasks.

class HelloTask(Task):
    """
    Example task printing a message on the terminal.
    """
    def __init__(self):
        """
        Override the parameters to put at each instantiation.
        """
        super(HelloTask, self).__init__(
            "Software category",
            "Task category",
            "A description",
        )

@HelloTask.as_task
def HelloFunction(task):
    print("Hello World!")

# instantiation of the task
task = HelloFunction()

Using plugin

If a plugin following the pipeline interface is defined and activated, it can be used to define a task. For example, if in the descriptor of the plugin (see Plugin descriptor) is defined a task called hello_world with the good software category, description, etc, you can simply create a task from this definition. Let assume that the plugin is called talker:

from poppy.pop.plugins import Plugin

# create the class of the task from the definitions of the pipeline
HelloTask = Plugin.manager["talker"].task("hello_world")

# instantiation of the task
task = HelloTask()

Note

Tasks through plugins contains also extended functionalities allowing for example to define targets simply by name from their definition in the descriptor. Refer to the section Targets for details and description of functionalities.

Communication

Communication term is used to refer to the ways that a task has to share information with other tasks or the pipeline.

Dependency

Expressing the dependency between several tasks is simple as writing Unix pipes. First declare some tasks.

@NoParametersTask.as_task
def taskA(task):
    print("Task A")

@NoParametersTask.as_task
def taskB(task):
    print("Task B")

@NoParametersTask.as_task
def taskC(task):
    print("Task C")

@NoParametersTask.as_task
def taskD(task):
    print("Task D")

@NoParametersTask.as_task
def taskE(task):
    print("Task E")

with NoParametersTask a class task where the mandatory parameters for the task instantiation are already set.

To express the following dependency:

A --->
      \
       \
        C ----> D ----> E
       /
      /
B --->

where E depends on D, which depends on C, which itself depends on A and B, you can write with tasks:

# create the C task
c = taskC()

# create the other tasks and set the topology as in the schema
taskA() | c | taskD() | taskE()

# express here the second branch of the graph of dependencies
taskB() | c

Inputs/Outputs

The pipeline needs to know what are the inputs and outputs of the task to check for their existence before and after starting it. The outputs of a task should always be created and existing before launching its children tasks. Same for the inputs.

For this, the Task gives two methods input() and output() returning the list of the name of attributes of the properties attribute of the pipeline where are stored the targets of the task. Targets are just the names of the wrapper of the inputs/outputs of the task, used to trace changes in their status and report them in the ROC database. More details on targets at Targets.

The pipeline uses these names to check there existence in the properties attribute, which is a container whose attributes are accessible as in a dictionary or as in a class instance, and that can be set in the same way. This is useful to refer to the attributes as names but hide this behaviour to the user. This is the properties that is used to share data and information between tasks.

In the case of tasks created through a function, the decorator gives the possibility to specify those lists:

@NoParametersTask.as_task(
    inputs=["file1", "file2"],
    outputs=["file1", "file3"]
)
def some_task(task):
    # do some work with files...

    print("I'm working!")

# instantiate the task
task = some_task()

Signals

A task emits some signals on which slots can be connected to be called when the signal is triggered.

An interface is provided in order to have the pipeline being able to deal with the changes of state of the task. Calling this methods on the task instance will let the possibility to emit the signal without knowledge of the signature of the signal.

The list of signals is:

  • changed:
    Emitted when the representation of the task in the ROC database as changed.
  • created:
    Emitted when the representation of the task in the ROC database as been created.
  • started:
    Emitted when the task started Usually resulting of the call of start() method on the task instance.
  • ended:
    Emitted when the task stopped. Usually resulting of the call of stop() method on the task instance.
  • reseted:
    Emitted when the internal states of the task instance have been reseted to their default values. Usually resulting of the call of reset() method on the task instance.
  • errored:
    Emitted when an error occurred when the task was running or not. Usually resulting of the call of error() method on the task instance.

For example, to call a function each time a task as an error, you can do:

>>> def call_on_error(task):
>>>     """
>>>     Called when an error occurred in the task on which it is connected.
>>>     """
>>>     print("{0} have an error!".format(task))

>>> # connect the slot to the signal
>>> task.errored.connect(call_on_error)

>>> # say an error occurred
>>> task.error()
Task have an error

If you want to disconnect from the signal, simply do:

>>> task.errored.disconnect(call_on_error)

Warning

Slots are registered with weak references. It means that the slot you are connecting to a signal doesn’t have its reference counter incremented, and thus the garbage collector will remove it if it is not referenced somewhere. Be sure to have a reference of the slot somewhere if you want to keep having the signal calling it!

API

exception poppy.core.task.TaskError[source]

Bases: Exception

Error associated to tasks.

__weakref__

list of weak references to the object (if defined)