Task Management

Introduction

The Workflow part introduces how to run research workflow in a loosely-coupled way. But it can only execute one task when you use qrun. To automatically generate and execute different tasks, Task Management provides a whole process including Task Generating, Task Storing, Task Training and Task Collecting. With this module, users can run their task automatically at different periods, in different losses, or even by different models.The processes of task generation, model training and combine and collect data are shown in the following figure.

../_images/Task-Gen-Recorder-Collector.svg

This whole process can be used in Online Serving.

An example of the entire process is shown here.

Task Generating

A task consists of Model, Dataset, Record, or anything added by users. The specific task template can be viewed in Task Section. Even though the task template is fixed, users can customize their TaskGen to generate different task by task template.

Here is the base class of TaskGen:

class qlib.workflow.task.gen.TaskGen

The base class for generating different tasks

Example 1:

input: a specific task template and rolling steps

output: rolling version of the tasks

Example 2:

input: a specific task template and losses list

output: a set of tasks with different losses

generate(task: dict) → List[dict]

Generate different tasks based on a task template

Parameters:task (dict) – a task template
Returns:A list of tasks
Return type:typing.List[dict]

Qlib provides a class RollingGen to generate a list of task of the dataset in different date segments. This class allows users to verify the effect of data from different periods on the model in one experiment. More information is here.

Task Storing

To achieve higher efficiency and the possibility of cluster operation, Task Manager will store all tasks in MongoDB. TaskManager can fetch undone tasks automatically and manage the lifecycle of a set of tasks with error handling. Users MUST finish the configuration of MongoDB when using this module.

Users need to provide the MongoDB URL and database name for using TaskManager in initialization or make a statement like this.

from qlib.config import C
C["mongo"] = {
    "task_url" : "mongodb://localhost:27017/", # your MongoDB url
    "task_db_name" : "rolling_db" # database name
}
class qlib.workflow.task.manage.TaskManager(task_pool: str)

Here is what will a task looks like when it created by TaskManager

{
    'def': pickle serialized task definition.  using pickle will make it easier
    'filter': json-like data. This is for filtering the tasks.
    'status': 'waiting' | 'running' | 'done'
    'res': pickle serialized task result,
}

The tasks manager assumes that you will only update the tasks you fetched. The mongo fetch one and update will make it date updating secure.

This class can be used as a tool from commandline. Here are several examples. You can view the help of manage module with the following commands: python -m qlib.workflow.task.manage -h # show manual of manage module CLI python -m qlib.workflow.task.manage wait -h # show manual of the wait command of manage

python -m qlib.workflow.task.manage -t <pool_name> wait
python -m qlib.workflow.task.manage -t <pool_name> task_stat

Note

Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded

Here are four status which are:

STATUS_WAITING: waiting for training

STATUS_RUNNING: training

STATUS_PART_DONE: finished some step and waiting for next step

STATUS_DONE: all work done

__init__(task_pool: str)

Init Task Manager, remember to make the statement of MongoDB url and database name firstly. A TaskManager instance serves a specific task pool. The static method of this module serves the whole MongoDB.

Parameters:task_pool (str) – the name of Collection in MongoDB
static list() → list

List the all collection(task_pool) of the db.

Returns:list
replace_task(task, new_task)

Use a new task to replace a old one

Parameters:
  • task – old task
  • new_task – new task
insert_task(task)

Insert a task.

Parameters:task – the task waiting for insert
Returns:pymongo.results.InsertOneResult
insert_task_def(task_def)

Insert a task to task_pool

Parameters:task_def (dict) – the task definition
Returns:
Return type:pymongo.results.InsertOneResult
create_task(task_def_l, dry_run=False, print_nt=False) → List[str]

If the tasks in task_def_l are new, then insert new tasks into the task_pool, and record inserted_id. If a task is not new, then just query its _id.

Parameters:
  • task_def_l (list) – a list of task
  • dry_run (bool) – if insert those new tasks to task pool
  • print_nt (bool) – if print new task
Returns:

a list of the _id of task_def_l

Return type:

List[str]

fetch_task(query={}, status='waiting') → dict

Use query to fetch tasks.

Parameters:
  • query (dict, optional) – query dict. Defaults to {}.
  • status (str, optional) – [description]. Defaults to STATUS_WAITING.
Returns:

a task(document in collection) after decoding

Return type:

dict

safe_fetch_task(query={}, status='waiting')

Fetch task from task_pool using query with contextmanager

Parameters:query (dict) – the dict of query
Returns:dict
Return type:a task(document in collection) after decoding
query(query={}, decode=True)

Query task in collection. This function may raise exception pymongo.errors.CursorNotFound: cursor id not found if it takes too long to iterate the generator

python -m qlib.workflow.task.manage -t <your task pool> query ‘{“_id”: “615498be837d0053acbc5d58”}’

Parameters:
  • query (dict) – the dict of query
  • decode (bool) –
Returns:

dict

Return type:

a task(document in collection) after decoding

re_query(_id) → dict

Use _id to query task.

Parameters:_id (str) – _id of a document
Returns:a task(document in collection) after decoding
Return type:dict
commit_task_res(task, res, status='done')

Commit the result to task[‘res’].

Parameters:
  • task ([type]) – [description]
  • res (object) – the result you want to save
  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_DONE.
return_task(task, status='waiting')

Return a task to status. Always using in error handling.

Parameters:
  • task ([type]) – [description]
  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_WAITING.
remove(query={})

Remove the task using query

Parameters:query (dict) – the dict of query
task_stat(query={}) → dict

Count the tasks in every status.

Parameters:query (dict, optional) – the query dict. Defaults to {}.
Returns:dict
reset_waiting(query={})

Reset all running task into waiting status. Can be used when some running task exit unexpected.

Parameters:query (dict, optional) – the query dict. Defaults to {}.
prioritize(task, priority: int)

Set priority for task

Parameters:
  • task (dict) – The task query from the database
  • priority (int) – the target priority
wait(query={})

When multiprocessing, the main progress may fetch nothing from TaskManager because there are still some running tasks. So main progress should wait until all tasks are trained well by other progress or machines.

Parameters:query (dict, optional) – the query dict. Defaults to {}.

More information of Task Manager can be found in here.

Task Training

After generating and storing those task, it’s time to run the task which is in the WAITING status. Qlib provides a method called run_task to run those task in task pool, however, users can also customize how tasks are executed. An easy way to get the task_func is using qlib.model.trainer.task_train directly. It will run the whole workflow defined by task, which includes Model, Dataset, Record.

qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)

While the task pool is not empty (has WAITING tasks), use task_func to fetch and run tasks in task_pool

After running this method, here are 4 situations (before_status -> after_status):

STATUS_WAITING -> STATUS_DONE: use task[“def”] as task_func param, it means that the task has not been started

STATUS_WAITING -> STATUS_PART_DONE: use task[“def”] as task_func param

STATUS_PART_DONE -> STATUS_PART_DONE: use task[“res”] as task_func param, it means that the task has been started but not completed

STATUS_PART_DONE -> STATUS_DONE: use task[“res”] as task_func param

Parameters:
  • task_func (Callable) –

    def (task_def, **kwargs) -> <res which will be committed>

    the function to run the task

  • task_pool (str) – the name of the task pool (Collection in MongoDB)
  • query (dict) – will use this dict to query task_pool when fetching task
  • force_release (bool) – will the program force to release the resource
  • before_status (str:) – the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.
  • after_status (str:) – the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.
  • kwargs – the params for task_func

Meanwhile, Qlib provides a module called Trainer.

class qlib.model.trainer.Trainer

The trainer can train a list of models. There are Trainer and DelayTrainer, which can be distinguished by when it will finish real training.

__init__()

Initialize self. See help(type(self)) for accurate signature.

train(tasks: list, *args, **kwargs) → list

Given a list of task definitions, begin training, and return the models.

For Trainer, it finishes real training in this method. For DelayTrainer, it only does some preparation in this method.

Parameters:tasks – a list of tasks
Returns:a list of models
Return type:list
end_train(models: list, *args, **kwargs) → list

Given a list of models, finished something at the end of training if you need. The models may be Recorder, txt file, database, and so on.

For Trainer, it does some finishing touches in this method. For DelayTrainer, it finishes real training in this method.

Parameters:models – a list of models
Returns:a list of models
Return type:list
is_delay() → bool

If Trainer will delay finishing end_train.

Returns:if DelayTrainer
Return type:bool
has_worker() → bool

Some trainer has backend worker to support parallel training This method can tell if the worker is enabled.

Returns:if the worker is enabled
Return type:bool
worker()

start the worker

Raises:NotImplementedError: – If the worker is not supported

Trainer will train a list of tasks and return a list of model recorders. Qlib offer two kinds of Trainer, TrainerR is the simplest way and TrainerRM is based on TaskManager to help manager tasks lifecycle automatically. If you do not want to use Task Manager to manage tasks, then use TrainerR to train a list of tasks generated by TaskGen is enough. Here are the details about different Trainer.

Task Collecting

Before collecting model training results, you need to use the qlib.init to specify the path of mlruns.

To collect the results of task after training, Qlib provides Collector, Group and Ensemble to collect the results in a readable, expandable and loosely-coupled way.

Collector can collect objects from everywhere and process them such as merging, grouping, averaging and so on. It has 2 step action including collect (collect anything in a dict) and process_collect (process collected dict).

Group also has 2 steps including group (can group a set of object based on group_func and change them to a dict) and reduce (can make a dict become an ensemble based on some rule). For example: {(A,B,C1): object, (A,B,C2): object} —group—> {(A,B): {C1: object, C2: object}} —reduce—> {(A,B): object}

Ensemble can merge the objects in an ensemble. For example: {C1: object, C2: object} —Ensemble—> object. You can set the ensembles you want in the Collector’s process_list. Common ensembles include AverageEnsemble and RollingEnsemble. Average ensemble is used to ensemble the results of different models in the same time period. Rollingensemble is used to ensemble the results of different models in the same time period

So the hierarchy is Collector’s second step corresponds to Group. And Group’s second step correspond to Ensemble.

For more information, please see Collector, Group and Ensemble, or the example.