Task Management¶
Introduction¶
The Workflow part introduces how to run research workflow in a loosely-coupled way. But it can only execute one task
when you use qrun
.
To automatically generate and execute different tasks, Task Management
provides a whole process including Task Generating, Task Storing, Task Training and Task Collecting.
With this module, users can run their task
automatically at different periods, in different losses, or even by different models.The processes of task generation, model training and combine and collect data are shown in the following figure.
This whole process can be used in Online Serving.
An example of the entire process is shown here.
Task Generating¶
A task
consists of Model, Dataset, Record, or anything added by users.
The specific task template can be viewed in
Task Section.
Even though the task template is fixed, users can customize their TaskGen
to generate different task
by task template.
Here is the base class of TaskGen
:
-
class
qlib.workflow.task.gen.
TaskGen
¶ The base class for generating different tasks
Example 1:
input: a specific task template and rolling steps
output: rolling version of the tasks
Example 2:
input: a specific task template and losses list
output: a set of tasks with different losses
-
generate
(task: dict) → List[dict]¶ Generate different tasks based on a task template
Parameters: task (dict) – a task template Returns: A list of tasks Return type: typing.List[dict]
-
Qlib
provides a class RollingGen to generate a list of task
of the dataset in different date segments.
This class allows users to verify the effect of data from different periods on the model in one experiment. More information is here.
Task Storing¶
To achieve higher efficiency and the possibility of cluster operation, Task Manager
will store all tasks in MongoDB.
TaskManager
can fetch undone tasks automatically and manage the lifecycle of a set of tasks with error handling.
Users MUST finish the configuration of MongoDB when using this module.
Users need to provide the MongoDB URL and database name for using TaskManager
in initialization or make a statement like this.
from qlib.config import C C["mongo"] = { "task_url" : "mongodb://localhost:27017/", # your MongoDB url "task_db_name" : "rolling_db" # database name }
-
class
qlib.workflow.task.manage.
TaskManager
(task_pool: str)¶ Here is what will a task looks like when it created by TaskManager
{ 'def': pickle serialized task definition. using pickle will make it easier 'filter': json-like data. This is for filtering the tasks. 'status': 'waiting' | 'running' | 'done' 'res': pickle serialized task result, }
The tasks manager assumes that you will only update the tasks you fetched. The mongo fetch one and update will make it date updating secure.
This class can be used as a tool from commandline. Here are several examples. You can view the help of manage module with the following commands: python -m qlib.workflow.task.manage -h # show manual of manage module CLI python -m qlib.workflow.task.manage wait -h # show manual of the wait command of manage
python -m qlib.workflow.task.manage -t <pool_name> wait python -m qlib.workflow.task.manage -t <pool_name> task_stat
Note
Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded
Here are four status which are:
STATUS_WAITING: waiting for training
STATUS_RUNNING: training
STATUS_PART_DONE: finished some step and waiting for next step
STATUS_DONE: all work done
-
__init__
(task_pool: str)¶ Init Task Manager, remember to make the statement of MongoDB url and database name firstly. A TaskManager instance serves a specific task pool. The static method of this module serves the whole MongoDB.
Parameters: task_pool (str) – the name of Collection in MongoDB
-
static
list
() → list¶ List the all collection(task_pool) of the db.
Returns: list
-
replace_task
(task, new_task)¶ Use a new task to replace a old one
Parameters: - task – old task
- new_task – new task
-
insert_task
(task)¶ Insert a task.
Parameters: task – the task waiting for insert Returns: pymongo.results.InsertOneResult
-
insert_task_def
(task_def)¶ Insert a task to task_pool
Parameters: task_def (dict) – the task definition Returns: Return type: pymongo.results.InsertOneResult
-
create_task
(task_def_l, dry_run=False, print_nt=False) → List[str]¶ If the tasks in task_def_l are new, then insert new tasks into the task_pool, and record inserted_id. If a task is not new, then just query its _id.
Parameters: - task_def_l (list) – a list of task
- dry_run (bool) – if insert those new tasks to task pool
- print_nt (bool) – if print new task
Returns: a list of the _id of task_def_l
Return type: List[str]
-
fetch_task
(query={}, status='waiting') → dict¶ Use query to fetch tasks.
Parameters: - query (dict, optional) – query dict. Defaults to {}.
- status (str, optional) – [description]. Defaults to STATUS_WAITING.
Returns: a task(document in collection) after decoding
Return type: dict
-
safe_fetch_task
(query={}, status='waiting')¶ Fetch task from task_pool using query with contextmanager
Parameters: query (dict) – the dict of query Returns: dict Return type: a task(document in collection) after decoding
-
query
(query={}, decode=True)¶ Query task in collection. This function may raise exception pymongo.errors.CursorNotFound: cursor id not found if it takes too long to iterate the generator
python -m qlib.workflow.task.manage -t <your task pool> query ‘{“_id”: “615498be837d0053acbc5d58”}’
Parameters: - query (dict) – the dict of query
- decode (bool) –
Returns: dict
Return type: a task(document in collection) after decoding
-
re_query
(_id) → dict¶ Use _id to query task.
Parameters: _id (str) – _id of a document Returns: a task(document in collection) after decoding Return type: dict
-
commit_task_res
(task, res, status='done')¶ Commit the result to task[‘res’].
Parameters: - task ([type]) – [description]
- res (object) – the result you want to save
- status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_DONE.
-
return_task
(task, status='waiting')¶ Return a task to status. Always using in error handling.
Parameters: - task ([type]) – [description]
- status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_WAITING.
-
remove
(query={})¶ Remove the task using query
Parameters: query (dict) – the dict of query
-
task_stat
(query={}) → dict¶ Count the tasks in every status.
Parameters: query (dict, optional) – the query dict. Defaults to {}. Returns: dict
-
reset_waiting
(query={})¶ Reset all running task into waiting status. Can be used when some running task exit unexpected.
Parameters: query (dict, optional) – the query dict. Defaults to {}.
-
prioritize
(task, priority: int)¶ Set priority for task
Parameters: - task (dict) – The task query from the database
- priority (int) – the target priority
-
wait
(query={})¶ When multiprocessing, the main progress may fetch nothing from TaskManager because there are still some running tasks. So main progress should wait until all tasks are trained well by other progress or machines.
Parameters: query (dict, optional) – the query dict. Defaults to {}.
-
More information of Task Manager
can be found in here.
Task Training¶
After generating and storing those task
, it’s time to run the task
which is in the WAITING status.
Qlib
provides a method called run_task
to run those task
in task pool, however, users can also customize how tasks are executed.
An easy way to get the task_func
is using qlib.model.trainer.task_train
directly.
It will run the whole workflow defined by task
, which includes Model, Dataset, Record.
-
qlib.workflow.task.manage.
run_task
(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)¶ While the task pool is not empty (has WAITING tasks), use task_func to fetch and run tasks in task_pool
After running this method, here are 4 situations (before_status -> after_status):
STATUS_WAITING -> STATUS_DONE: use task[“def”] as task_func param, it means that the task has not been started
STATUS_WAITING -> STATUS_PART_DONE: use task[“def”] as task_func param
STATUS_PART_DONE -> STATUS_PART_DONE: use task[“res”] as task_func param, it means that the task has been started but not completed
STATUS_PART_DONE -> STATUS_DONE: use task[“res”] as task_func param
Parameters: - task_func (Callable) –
- def (task_def, **kwargs) -> <res which will be committed>
- the function to run the task
- task_pool (str) – the name of the task pool (Collection in MongoDB)
- query (dict) – will use this dict to query task_pool when fetching task
- force_release (bool) – will the program force to release the resource
- before_status (str:) – the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.
- after_status (str:) – the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.
- kwargs – the params for task_func
- task_func (Callable) –
Meanwhile, Qlib
provides a module called Trainer
.
-
class
qlib.model.trainer.
Trainer
¶ The trainer can train a list of models. There are Trainer and DelayTrainer, which can be distinguished by when it will finish real training.
-
__init__
()¶ Initialize self. See help(type(self)) for accurate signature.
-
train
(tasks: list, *args, **kwargs) → list¶ Given a list of task definitions, begin training, and return the models.
For Trainer, it finishes real training in this method. For DelayTrainer, it only does some preparation in this method.
Parameters: tasks – a list of tasks Returns: a list of models Return type: list
-
end_train
(models: list, *args, **kwargs) → list¶ Given a list of models, finished something at the end of training if you need. The models may be Recorder, txt file, database, and so on.
For Trainer, it does some finishing touches in this method. For DelayTrainer, it finishes real training in this method.
Parameters: models – a list of models Returns: a list of models Return type: list
-
is_delay
() → bool¶ If Trainer will delay finishing end_train.
Returns: if DelayTrainer Return type: bool
-
has_worker
() → bool¶ Some trainer has backend worker to support parallel training This method can tell if the worker is enabled.
Returns: if the worker is enabled Return type: bool
-
worker
()¶ start the worker
Raises: NotImplementedError: – If the worker is not supported
-
Trainer
will train a list of tasks and return a list of model recorders.
Qlib
offer two kinds of Trainer, TrainerR is the simplest way and TrainerRM is based on TaskManager to help manager tasks lifecycle automatically.
If you do not want to use Task Manager
to manage tasks, then use TrainerR to train a list of tasks generated by TaskGen
is enough.
Here are the details about different Trainer
.
Task Collecting¶
Before collecting model training results, you need to use the qlib.init
to specify the path of mlruns.
To collect the results of task
after training, Qlib
provides Collector, Group and Ensemble to collect the results in a readable, expandable and loosely-coupled way.
Collector can collect objects from everywhere and process them such as merging, grouping, averaging and so on. It has 2 step action including collect
(collect anything in a dict) and process_collect
(process collected dict).
Group also has 2 steps including group
(can group a set of object based on group_func and change them to a dict) and reduce
(can make a dict become an ensemble based on some rule).
For example: {(A,B,C1): object, (A,B,C2): object} —group
—> {(A,B): {C1: object, C2: object}} —reduce
—> {(A,B): object}
Ensemble can merge the objects in an ensemble.
For example: {C1: object, C2: object} —Ensemble
—> object.
You can set the ensembles you want in the Collector
’s process_list.
Common ensembles include AverageEnsemble
and RollingEnsemble
. Average ensemble is used to ensemble the results of different models in the same time period. Rollingensemble is used to ensemble the results of different models in the same time period
So the hierarchy is Collector
’s second step corresponds to Group
. And Group
’s second step correspond to Ensemble
.
For more information, please see Collector, Group and Ensemble, or the example.