API Reference

Here you can find all Qlib interfaces.

Data

Provider

class qlib.data.data.ProviderBackendMixin

This helper class tries to make the provider based on storage backend more convenient It is not necessary to inherent this class if that provider don’t rely on the backend storage

class qlib.data.data.CalendarProvider

Calendar provider base class

Provide calendar data.

calendar(start_time=None, end_time=None, freq='day', future=False)

Get calendar of certain market in given time range.

Parameters:
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • freq (str) – time frequency, available: year/quarter/month/week/day.
  • future (bool) – whether including future trading day.
Returns:

calendar list

Return type:

list

locate_index(start_time: Union[pandas._libs.tslibs.timestamps.Timestamp, str], end_time: Union[pandas._libs.tslibs.timestamps.Timestamp, str], freq: str, future: bool = False)

Locate the start time index and end time index in a calendar under certain frequency.

Parameters:
  • start_time (pd.Timestamp) – start of the time range.
  • end_time (pd.Timestamp) – end of the time range.
  • freq (str) – time frequency, available: year/quarter/month/week/day.
  • future (bool) – whether including future trading day.
Returns:

  • pd.Timestamp – the real start time.
  • pd.Timestamp – the real end time.
  • int – the index of start time.
  • int – the index of end time.

load_calendar(freq, future)

Load original calendar timestamp from file.

Parameters:
  • freq (str) – frequency of read calendar file.
  • future (bool) –
Returns:

list of timestamps

Return type:

list

class qlib.data.data.InstrumentProvider

Instrument provider base class

Provide instrument data.

static instruments(market: Union[List[T], str] = 'all', filter_pipe: Optional[List[T]] = None)

Get the general config dictionary for a base market adding several dynamic filters.

Parameters:
  • market (Union[List, str]) –
    str:
    market/industry/index shortname, e.g. all/sse/szse/sse50/csi300/csi500.
    list:
    [“ID1”, “ID2”]. A list of stocks
  • filter_pipe (list) – the list of dynamic filters.
Returns:

  • dict (if isinstance(market, str)) – dict of stockpool config.

    {market => base market name, filter_pipe => list of filters}

    example :

  • list (if isinstance(market, list)) – just return the original list directly. NOTE: this will make the instruments compatible with more cases. The user code will be simpler.

list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)

List the instruments based on a certain stockpool config.

Parameters:
  • instruments (dict) – stockpool config.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • as_list (bool) – return instruments as list or dict.
Returns:

instruments list or dictionary with time spans

Return type:

dict or list

class qlib.data.data.FeatureProvider

Feature provider class

Provide feature data.

feature(instrument, field, start_time, end_time, freq)

Get feature data.

Parameters:
  • instrument (str) – a certain instrument.
  • field (str) – a certain field of feature.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • freq (str) – time frequency, available: year/quarter/month/week/day.
Returns:

data of a certain feature

Return type:

pd.Series

class qlib.data.data.PITProvider
period_feature(instrument, field, start_index: int, end_index: int, cur_time: pandas._libs.tslibs.timestamps.Timestamp, period: Optional[int] = None) → pandas.core.series.Series

get the historical periods data series between start_index and end_index

Parameters:
  • start_index (int) – start_index is a relative index to the latest period to cur_time
  • end_index (int) – end_index is a relative index to the latest period to cur_time in most cases, the start_index and end_index will be a non-positive values For example, start_index == -3 end_index == 0 and current period index is cur_idx, then the data between [start_index + cur_idx, end_index + cur_idx] will be retrieved.
  • period (int) – This is used for query specific period. The period is represented with int in Qlib. (e.g. 202001 may represent the first quarter in 2020) NOTE: period will override start_index and end_index
Returns:

The index will be integers to indicate the periods of the data An typical examples will be TODO

Return type:

pd.Series

Raises:

FileNotFoundError – This exception will be raised if the queried data do not exist.

class qlib.data.data.ExpressionProvider

Expression provider class

Provide Expression data.

__init__()

Initialize self. See help(type(self)) for accurate signature.

expression(instrument, field, start_time=None, end_time=None, freq='day') → pandas.core.series.Series

Get Expression data.

The responsibility of expression - parse the field and load the according data. - When loading the data, it should handle the time dependency of the data. get_expression_instance is commonly used in this method

Parameters:
  • instrument (str) – a certain instrument.
  • field (str) – a certain field of feature.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • freq (str) – time frequency, available: year/quarter/month/week/day.
Returns:

data of a certain expression

The data has two types of format

  1. expression with datetime index

  2. expression with integer index

    • because the datetime is not as good as

Return type:

pd.Series

class qlib.data.data.DatasetProvider

Dataset provider class

Provide Dataset data.

dataset(instruments, fields, start_time=None, end_time=None, freq='day', inst_processors=[])

Get dataset data.

Parameters:
  • instruments (list or dict) – list/dict of instruments or dict of stockpool config.
  • fields (list) – list of feature instances.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • freq (str) – time frequency.
  • inst_processors (Iterable[Union[dict, InstProcessor]]) – the operations performed on each instrument
Returns:

a pandas dataframe with <instrument, datetime> index.

Return type:

pd.DataFrame

static get_instruments_d(instruments, freq)

Parse different types of input instruments to output instruments_d Wrong format of input instruments will lead to exception.

static get_column_names(fields)

Get column names from input fields

static dataset_processor(instruments_d, column_names, start_time, end_time, freq, inst_processors=[])

Load and process the data, return the data set. - default using multi-kernel method.

static inst_calculator(inst, start_time, end_time, freq, column_names, spans=None, g_config=None, inst_processors=[])

Calculate the expressions for one instrument, return a df result. If the expression has been calculated before, load from cache.

return value: A data frame with index ‘datetime’ and other data columns.

class qlib.data.data.LocalCalendarProvider(remote=False, backend={})

Local calendar data provider class

Provide calendar data from local data source.

__init__(remote=False, backend={})

Initialize self. See help(type(self)) for accurate signature.

load_calendar(freq, future)

Load original calendar timestamp from file.

Parameters:
  • freq (str) – frequency of read calendar file.
  • future (bool) –
Returns:

list of timestamps

Return type:

list

class qlib.data.data.LocalInstrumentProvider(backend={})

Local instrument data provider class

Provide instrument data from local data source.

__init__(backend={}) → None

Initialize self. See help(type(self)) for accurate signature.

list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)

List the instruments based on a certain stockpool config.

Parameters:
  • instruments (dict) – stockpool config.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • as_list (bool) – return instruments as list or dict.
Returns:

instruments list or dictionary with time spans

Return type:

dict or list

class qlib.data.data.LocalFeatureProvider(remote=False, backend={})

Local feature data provider class

Provide feature data from local data source.

__init__(remote=False, backend={})

Initialize self. See help(type(self)) for accurate signature.

feature(instrument, field, start_index, end_index, freq)

Get feature data.

Parameters:
  • instrument (str) – a certain instrument.
  • field (str) – a certain field of feature.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • freq (str) – time frequency, available: year/quarter/month/week/day.
Returns:

data of a certain feature

Return type:

pd.Series

class qlib.data.data.LocalPITProvider
period_feature(instrument, field, start_index, end_index, cur_time, period=None)

get the historical periods data series between start_index and end_index

Parameters:
  • start_index (int) – start_index is a relative index to the latest period to cur_time
  • end_index (int) – end_index is a relative index to the latest period to cur_time in most cases, the start_index and end_index will be a non-positive values For example, start_index == -3 end_index == 0 and current period index is cur_idx, then the data between [start_index + cur_idx, end_index + cur_idx] will be retrieved.
  • period (int) – This is used for query specific period. The period is represented with int in Qlib. (e.g. 202001 may represent the first quarter in 2020) NOTE: period will override start_index and end_index
Returns:

The index will be integers to indicate the periods of the data An typical examples will be TODO

Return type:

pd.Series

Raises:

FileNotFoundError – This exception will be raised if the queried data do not exist.

class qlib.data.data.LocalExpressionProvider(time2idx=True)

Local expression data provider class

Provide expression data from local data source.

__init__(time2idx=True)

Initialize self. See help(type(self)) for accurate signature.

expression(instrument, field, start_time=None, end_time=None, freq='day')

Get Expression data.

The responsibility of expression - parse the field and load the according data. - When loading the data, it should handle the time dependency of the data. get_expression_instance is commonly used in this method

Parameters:
  • instrument (str) – a certain instrument.
  • field (str) – a certain field of feature.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • freq (str) – time frequency, available: year/quarter/month/week/day.
Returns:

data of a certain expression

The data has two types of format

  1. expression with datetime index

  2. expression with integer index

    • because the datetime is not as good as

Return type:

pd.Series

class qlib.data.data.LocalDatasetProvider(align_time: bool = True)

Local dataset data provider class

Provide dataset data from local data source.

__init__(align_time: bool = True)
Parameters:align_time (bool) –

Will we align the time to calendar the frequency is flexible in some dataset and can’t be aligned. For the data with fixed frequency with a shared calendar, the align data to the calendar will provides following benefits

  • Align queries to the same parameters, so the cache can be shared.
dataset(instruments, fields, start_time=None, end_time=None, freq='day', inst_processors=[])

Get dataset data.

Parameters:
  • instruments (list or dict) – list/dict of instruments or dict of stockpool config.
  • fields (list) – list of feature instances.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • freq (str) – time frequency.
  • inst_processors (Iterable[Union[dict, InstProcessor]]) – the operations performed on each instrument
Returns:

a pandas dataframe with <instrument, datetime> index.

Return type:

pd.DataFrame

static multi_cache_walker(instruments, fields, start_time=None, end_time=None, freq='day')

This method is used to prepare the expression cache for the client. Then the client will load the data from expression cache by itself.

static cache_walker(inst, start_time, end_time, freq, column_names)

If the expressions of one instrument haven’t been calculated before, calculate it and write it into expression cache.

class qlib.data.data.ClientCalendarProvider

Client calendar data provider class

Provide calendar data by requesting data from server as a client.

__init__()

Initialize self. See help(type(self)) for accurate signature.

calendar(start_time=None, end_time=None, freq='day', future=False)

Get calendar of certain market in given time range.

Parameters:
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • freq (str) – time frequency, available: year/quarter/month/week/day.
  • future (bool) – whether including future trading day.
Returns:

calendar list

Return type:

list

class qlib.data.data.ClientInstrumentProvider

Client instrument data provider class

Provide instrument data by requesting data from server as a client.

__init__()

Initialize self. See help(type(self)) for accurate signature.

list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)

List the instruments based on a certain stockpool config.

Parameters:
  • instruments (dict) – stockpool config.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • as_list (bool) – return instruments as list or dict.
Returns:

instruments list or dictionary with time spans

Return type:

dict or list

class qlib.data.data.ClientDatasetProvider

Client dataset data provider class

Provide dataset data by requesting data from server as a client.

__init__()

Initialize self. See help(type(self)) for accurate signature.

dataset(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=0, return_uri=False, inst_processors=[])

Get dataset data.

Parameters:
  • instruments (list or dict) – list/dict of instruments or dict of stockpool config.
  • fields (list) – list of feature instances.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
  • freq (str) – time frequency.
  • inst_processors (Iterable[Union[dict, InstProcessor]]) – the operations performed on each instrument
Returns:

a pandas dataframe with <instrument, datetime> index.

Return type:

pd.DataFrame

class qlib.data.data.BaseProvider

Local provider class It is a set of interface that allow users to access data. Because PITD is not exposed publicly to users, so it is not included in the interface.

To keep compatible with old qlib provider.

features(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=None, inst_processors=[])
Parameters:disk_cache (int) – whether to skip(0)/use(1)/replace(2) disk_cache

This function will try to use cache method which has a keyword disk_cache, and will use provider method if a type error is raised because the DatasetD instance is a provider class.

class qlib.data.data.LocalProvider
features_uri(instruments, fields, start_time, end_time, freq, disk_cache=1)

Return the uri of the generated cache of features/dataset

Parameters:
  • disk_cache
  • instruments
  • fields
  • start_time
  • end_time
  • freq
class qlib.data.data.ClientProvider

Client Provider

Requesting data from server as a client. Can propose requests:

  • Calendar : Directly respond a list of calendars
  • Instruments (without filter): Directly respond a list/dict of instruments
  • Instruments (with filters): Respond a list/dict of instruments
  • Features : Respond a cache uri

The general workflow is described as follows: When the user use client provider to propose a request, the client provider will connect the server and send the request. The client will start to wait for the response. The response will be made instantly indicating whether the cache is available. The waiting procedure will terminate only when the client get the response saying feature_available is true. BUG : Everytime we make request for certain data we need to connect to the server, wait for the response and disconnect from it. We can’t make a sequence of requests within one connection. You can refer to https://python-socketio.readthedocs.io/en/latest/client.html for documentation of python-socketIO client.

__init__()

Initialize self. See help(type(self)) for accurate signature.

qlib.data.data.CalendarProviderWrapper

alias of qlib.data.data.CalendarProvider

qlib.data.data.InstrumentProviderWrapper

alias of qlib.data.data.InstrumentProvider

qlib.data.data.FeatureProviderWrapper

alias of qlib.data.data.FeatureProvider

qlib.data.data.PITProviderWrapper

alias of qlib.data.data.PITProvider

qlib.data.data.ExpressionProviderWrapper

alias of qlib.data.data.ExpressionProvider

qlib.data.data.DatasetProviderWrapper

alias of qlib.data.data.DatasetProvider

qlib.data.data.BaseProviderWrapper

alias of qlib.data.data.BaseProvider

qlib.data.data.register_all_wrappers(C)

Filter

class qlib.data.filter.BaseDFilter

Dynamic Instruments Filter Abstract class

Users can override this class to construct their own filter

Override __init__ to input filter regulations

Override filter_main to use the regulations to filter instruments

__init__()

Initialize self. See help(type(self)) for accurate signature.

static from_config(config)

Construct an instance from config dict.

Parameters:config (dict) – dict of config parameters.
to_config()

Construct an instance from config dict.

Returns:return the dict of config parameters.
Return type:dict
class qlib.data.filter.SeriesDFilter(fstart_time=None, fend_time=None, keep=False)

Dynamic Instruments Filter Abstract class to filter a series of certain features

Filters should provide parameters:

  • filter start time
  • filter end time
  • filter rule

Override __init__ to assign a certain rule to filter the series.

Override _getFilterSeries to use the rule to filter the series and get a dict of {inst => series}, or override filter_main for more advanced series filter rule

__init__(fstart_time=None, fend_time=None, keep=False)
Init function for filter base class.
Filter a set of instruments based on a certain rule within a certain period assigned by fstart_time and fend_time.
Parameters:
  • fstart_time (str) – the time for the filter rule to start filter the instruments.
  • fend_time (str) – the time for the filter rule to stop filter the instruments.
  • keep (bool) – whether to keep the instruments of which features don’t exist in the filter time span.
filter_main(instruments, start_time=None, end_time=None)

Implement this method to filter the instruments.

Parameters:
  • instruments (dict) – input instruments to be filtered.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
Returns:

filtered instruments, same structure as input instruments.

Return type:

dict

class qlib.data.filter.NameDFilter(name_rule_re, fstart_time=None, fend_time=None)

Name dynamic instrument filter

Filter the instruments based on a regulated name format.

A name rule regular expression is required.

__init__(name_rule_re, fstart_time=None, fend_time=None)

Init function for name filter class

Parameters:name_rule_re (str) – regular expression for the name rule.
static from_config(config)

Construct an instance from config dict.

Parameters:config (dict) – dict of config parameters.
to_config()

Construct an instance from config dict.

Returns:return the dict of config parameters.
Return type:dict
class qlib.data.filter.ExpressionDFilter(rule_expression, fstart_time=None, fend_time=None, keep=False)

Expression dynamic instrument filter

Filter the instruments based on a certain expression.

An expression rule indicating a certain feature field is required.

Examples

  • basic features filter : rule_expression = ‘$close/$open>5’
  • cross-sectional features filter : rule_expression = ‘$rank($close)<10’
  • time-sequence features filter : rule_expression = ‘$Ref($close, 3)>100’
__init__(rule_expression, fstart_time=None, fend_time=None, keep=False)

Init function for expression filter class

Parameters:
  • fstart_time (str) – filter the feature starting from this time.
  • fend_time (str) – filter the feature ending by this time.
  • rule_expression (str) – an input expression for the rule.
static from_config(config)

Construct an instance from config dict.

Parameters:config (dict) – dict of config parameters.
to_config()

Construct an instance from config dict.

Returns:return the dict of config parameters.
Return type:dict

Class

class qlib.data.base.Expression

Expression base class

Expression is designed to handle the calculation of data with the format below data with two dimension for each instrument,

  • feature

  • time: it could be observation time or period time.

    • period time is designed for Point-in-time database. For example, the period time maybe 2014Q4, its value can observed for multiple times(different value may be observed at different time due to amendment).
load(instrument, start_index, end_index, *args)

load feature This function is responsible for loading feature/expression based on the expression engine.

The concrete implementation will be separated into two parts:

  1. caching data, handle errors.

    • This part is shared by all the expressions and implemented in Expression
  2. processing and calculating data based on the specific expression.

    • This part is different in each expression and implemented in each expression

Expression Engine is shared by different data. Different data will have different extra information for args.

Parameters:
  • instrument (str) – instrument code.
  • start_index (str) – feature start index [in calendar].
  • end_index (str) – feature end index [in calendar].
  • may contain following information (*args) –
  • if it is used in basic expression engine data, it contains following arguments (1)) –
    freq: str
    feature frequency.
  • if is used in PIT data, it contains following arguments (2)) –
    cur_pit:
    it is designed for the point-in-time data.
    period: int
    This is used for query specific period. The period is represented with int in Qlib. (e.g. 202001 may represent the first quarter in 2020)
Returns:

feature series: The index of the series is the calendar index

Return type:

pd.Series

get_longest_back_rolling()

Get the longest length of historical data the feature has accessed

This is designed for getting the needed range of the data to calculate the features in specific range at first. However, situations like Ref(Ref($close, -1), 1) can not be handled rightly.

So this will only used for detecting the length of historical data needed.

get_extended_window_size()

get_extend_window_size

For to calculate this Operator in range[start_index, end_index] We have to get the leaf feature in range[start_index - lft_etd, end_index + rght_etd].

Returns:lft_etd, rght_etd
Return type:(int, int)
class qlib.data.base.Feature(name=None)

Static Expression

This kind of feature will load data from provider

__init__(name=None)

Initialize self. See help(type(self)) for accurate signature.

get_longest_back_rolling()

Get the longest length of historical data the feature has accessed

This is designed for getting the needed range of the data to calculate the features in specific range at first. However, situations like Ref(Ref($close, -1), 1) can not be handled rightly.

So this will only used for detecting the length of historical data needed.

get_extended_window_size()

get_extend_window_size

For to calculate this Operator in range[start_index, end_index] We have to get the leaf feature in range[start_index - lft_etd, end_index + rght_etd].

Returns:lft_etd, rght_etd
Return type:(int, int)
class qlib.data.base.PFeature(name=None)
class qlib.data.base.ExpressionOps

Operator Expression

This kind of feature will use operator for feature construction on the fly.

Operator

class qlib.data.ops.ElemOperator(feature)

Element-wise Operator

Parameters:feature (Expression) – feature instance
Returns:feature operation output
Return type:Expression
__init__(feature)

Initialize self. See help(type(self)) for accurate signature.

get_longest_back_rolling()

Get the longest length of historical data the feature has accessed

This is designed for getting the needed range of the data to calculate the features in specific range at first. However, situations like Ref(Ref($close, -1), 1) can not be handled rightly.

So this will only used for detecting the length of historical data needed.

get_extended_window_size()

get_extend_window_size

For to calculate this Operator in range[start_index, end_index] We have to get the leaf feature in range[start_index - lft_etd, end_index + rght_etd].

Returns:lft_etd, rght_etd
Return type:(int, int)
class qlib.data.ops.ChangeInstrument(instrument, feature)

Change Instrument Operator In some case, one may want to change to another instrument when calculating, for example, to calculate beta of a stock with respect to a market index. This would require changing the calculation of features from the stock (original instrument) to the index (reference instrument) :param instrument: i.e., SH000300 (CSI300 index), or ^GPSC (SP500 index). :type instrument: new instrument for which the downstream operations should be performed upon. :param feature: :type feature: the feature to be calculated for the new instrument.

Returns:feature operation output
Return type:Expression
__init__(instrument, feature)

Initialize self. See help(type(self)) for accurate signature.

load(instrument, start_index, end_index, *args)

load feature This function is responsible for loading feature/expression based on the expression engine.

The concrete implementation will be separated into two parts:

  1. caching data, handle errors.

    • This part is shared by all the expressions and implemented in Expression
  2. processing and calculating data based on the specific expression.

    • This part is different in each expression and implemented in each expression

Expression Engine is shared by different data. Different data will have different extra information for args.

Parameters:
  • instrument (str) – instrument code.
  • start_index (str) – feature start index [in calendar].
  • end_index (str) – feature end index [in calendar].
  • may contain following information (*args) –
  • if it is used in basic expression engine data, it contains following arguments (1)) –
    freq: str
    feature frequency.
  • if is used in PIT data, it contains following arguments (2)) –
    cur_pit:
    it is designed for the point-in-time data.
    period: int
    This is used for query specific period. The period is represented with int in Qlib. (e.g. 202001 may represent the first quarter in 2020)
Returns:

feature series: The index of the series is the calendar index

Return type:

pd.Series

class qlib.data.ops.NpElemOperator(feature, func)

Numpy Element-wise Operator

Parameters:
  • feature (Expression) – feature instance
  • func (str) – numpy feature operation method
Returns:

feature operation output

Return type:

Expression

__init__(feature, func)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Abs(feature)

Feature Absolute Value

Parameters:feature (Expression) – feature instance
Returns:a feature instance with absolute output
Return type:Expression
__init__(feature)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Sign(feature)

Feature Sign

Parameters:feature (Expression) – feature instance
Returns:a feature instance with sign
Return type:Expression
__init__(feature)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Log(feature)

Feature Log

Parameters:feature (Expression) – feature instance
Returns:a feature instance with log
Return type:Expression
__init__(feature)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Mask(feature, instrument)

Feature Mask

Parameters:
  • feature (Expression) – feature instance
  • instrument (str) – instrument mask
Returns:

a feature instance with masked instrument

Return type:

Expression

__init__(feature, instrument)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Not(feature)

Not Operator

Parameters:feature (Expression) – feature instance
Returns:feature elementwise not output
Return type:Feature
__init__(feature)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.PairOperator(feature_left, feature_right)

Pair-wise operator

Parameters:
  • feature_left (Expression) – feature instance or numeric value
  • feature_right (Expression) – feature instance or numeric value
Returns:

two features’ operation output

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

get_longest_back_rolling()

Get the longest length of historical data the feature has accessed

This is designed for getting the needed range of the data to calculate the features in specific range at first. However, situations like Ref(Ref($close, -1), 1) can not be handled rightly.

So this will only used for detecting the length of historical data needed.

get_extended_window_size()

get_extend_window_size

For to calculate this Operator in range[start_index, end_index] We have to get the leaf feature in range[start_index - lft_etd, end_index + rght_etd].

Returns:lft_etd, rght_etd
Return type:(int, int)
class qlib.data.ops.NpPairOperator(feature_left, feature_right, func)

Numpy Pair-wise operator

Parameters:
  • feature_left (Expression) – feature instance or numeric value
  • feature_right (Expression) – feature instance or numeric value
  • func (str) – operator function
Returns:

two features’ operation output

Return type:

Feature

__init__(feature_left, feature_right, func)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Power(feature_left, feature_right)

Power Operator

Parameters:
Returns:

The bases in feature_left raised to the exponents in feature_right

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Add(feature_left, feature_right)

Add Operator

Parameters:
Returns:

two features’ sum

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Sub(feature_left, feature_right)

Subtract Operator

Parameters:
Returns:

two features’ subtraction

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Mul(feature_left, feature_right)

Multiply Operator

Parameters:
Returns:

two features’ product

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Div(feature_left, feature_right)

Division Operator

Parameters:
Returns:

two features’ division

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Greater(feature_left, feature_right)

Greater Operator

Parameters:
Returns:

greater elements taken from the input two features

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Less(feature_left, feature_right)

Less Operator

Parameters:
Returns:

smaller elements taken from the input two features

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Gt(feature_left, feature_right)

Greater Than Operator

Parameters:
Returns:

bool series indicate left > right

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Ge(feature_left, feature_right)

Greater Equal Than Operator

Parameters:
Returns:

bool series indicate left >= right

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Lt(feature_left, feature_right)

Less Than Operator

Parameters:
Returns:

bool series indicate left < right

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Le(feature_left, feature_right)

Less Equal Than Operator

Parameters:
Returns:

bool series indicate left <= right

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Eq(feature_left, feature_right)

Equal Operator

Parameters:
Returns:

bool series indicate left == right

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Ne(feature_left, feature_right)

Not Equal Operator

Parameters:
Returns:

bool series indicate left != right

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.And(feature_left, feature_right)

And Operator

Parameters:
Returns:

two features’ row by row & output

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Or(feature_left, feature_right)

Or Operator

Parameters:
Returns:

two features’ row by row | outputs

Return type:

Feature

__init__(feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.If(condition, feature_left, feature_right)

If Operator

Parameters:
  • condition (Expression) – feature instance with bool values as condition
  • feature_left (Expression) – feature instance
  • feature_right (Expression) – feature instance
__init__(condition, feature_left, feature_right)

Initialize self. See help(type(self)) for accurate signature.

get_longest_back_rolling()

Get the longest length of historical data the feature has accessed

This is designed for getting the needed range of the data to calculate the features in specific range at first. However, situations like Ref(Ref($close, -1), 1) can not be handled rightly.

So this will only used for detecting the length of historical data needed.

get_extended_window_size()

get_extend_window_size

For to calculate this Operator in range[start_index, end_index] We have to get the leaf feature in range[start_index - lft_etd, end_index + rght_etd].

Returns:lft_etd, rght_etd
Return type:(int, int)
class qlib.data.ops.Rolling(feature, N, func)

Rolling Operator The meaning of rolling and expanding is the same in pandas. When the window is set to 0, the behaviour of the operator should follow expanding Otherwise, it follows rolling

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
  • func (str) – rolling method
Returns:

rolling outputs

Return type:

Expression

__init__(feature, N, func)

Initialize self. See help(type(self)) for accurate signature.

get_longest_back_rolling()

Get the longest length of historical data the feature has accessed

This is designed for getting the needed range of the data to calculate the features in specific range at first. However, situations like Ref(Ref($close, -1), 1) can not be handled rightly.

So this will only used for detecting the length of historical data needed.

get_extended_window_size()

get_extend_window_size

For to calculate this Operator in range[start_index, end_index] We have to get the leaf feature in range[start_index - lft_etd, end_index + rght_etd].

Returns:lft_etd, rght_etd
Return type:(int, int)
class qlib.data.ops.Ref(feature, N)

Feature Reference

Parameters:
  • feature (Expression) – feature instance
  • N (int) – N = 0, retrieve the first data; N > 0, retrieve data of N periods ago; N < 0, future data
Returns:

a feature instance with target reference

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

get_longest_back_rolling()

Get the longest length of historical data the feature has accessed

This is designed for getting the needed range of the data to calculate the features in specific range at first. However, situations like Ref(Ref($close, -1), 1) can not be handled rightly.

So this will only used for detecting the length of historical data needed.

get_extended_window_size()

get_extend_window_size

For to calculate this Operator in range[start_index, end_index] We have to get the leaf feature in range[start_index - lft_etd, end_index + rght_etd].

Returns:lft_etd, rght_etd
Return type:(int, int)
class qlib.data.ops.Mean(feature, N)

Rolling Mean (MA)

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling average

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Sum(feature, N)

Rolling Sum

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling sum

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Std(feature, N)

Rolling Std

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling std

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Var(feature, N)

Rolling Variance

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling variance

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Skew(feature, N)

Rolling Skewness

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling skewness

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Kurt(feature, N)

Rolling Kurtosis

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling kurtosis

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Max(feature, N)

Rolling Max

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling max

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.IdxMax(feature, N)

Rolling Max Index

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling max index

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Min(feature, N)

Rolling Min

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling min

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.IdxMin(feature, N)

Rolling Min Index

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling min index

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Quantile(feature, N, qscore)

Rolling Quantile

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling quantile

Return type:

Expression

__init__(feature, N, qscore)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Med(feature, N)

Rolling Median

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling median

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Mad(feature, N)

Rolling Mean Absolute Deviation

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling mean absolute deviation

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Rank(feature, N)

Rolling Rank (Percentile)

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling rank

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Count(feature, N)

Rolling Count

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling count of number of non-NaN elements

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Delta(feature, N)

Rolling Delta

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with end minus start in rolling window

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Slope(feature, N)

Rolling Slope This operator calculate the slope between idx and feature. (e.g. [<feature_t1>, <feature_t2>, <feature_t3>] and [1, 2, 3])

Usage Example: - “Slope($close, %d)/$close”

# TODO: # Some users may want pair-wise rolling like Slope(A, B, N)

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with linear regression slope of given window

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Rsquare(feature, N)

Rolling R-value Square

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with linear regression r-value square of given window

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Resi(feature, N)

Rolling Regression Residuals

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with regression residuals of given window

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.WMA(feature, N)

Rolling WMA

Parameters:
  • feature (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with weighted moving average output

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.EMA(feature, N)

Rolling Exponential Mean (EMA)

Parameters:
  • feature (Expression) – feature instance
  • N (int, float) – rolling window size
Returns:

a feature instance with regression r-value square of given window

Return type:

Expression

__init__(feature, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.PairRolling(feature_left, feature_right, N, func)

Pair Rolling Operator

Parameters:
  • feature_left (Expression) – feature instance
  • feature_right (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling output of two input features

Return type:

Expression

__init__(feature_left, feature_right, N, func)

Initialize self. See help(type(self)) for accurate signature.

get_longest_back_rolling()

Get the longest length of historical data the feature has accessed

This is designed for getting the needed range of the data to calculate the features in specific range at first. However, situations like Ref(Ref($close, -1), 1) can not be handled rightly.

So this will only used for detecting the length of historical data needed.

get_extended_window_size()

get_extend_window_size

For to calculate this Operator in range[start_index, end_index] We have to get the leaf feature in range[start_index - lft_etd, end_index + rght_etd].

Returns:lft_etd, rght_etd
Return type:(int, int)
class qlib.data.ops.Corr(feature_left, feature_right, N)

Rolling Correlation

Parameters:
  • feature_left (Expression) – feature instance
  • feature_right (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling correlation of two input features

Return type:

Expression

__init__(feature_left, feature_right, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.Cov(feature_left, feature_right, N)

Rolling Covariance

Parameters:
  • feature_left (Expression) – feature instance
  • feature_right (Expression) – feature instance
  • N (int) – rolling window size
Returns:

a feature instance with rolling max of two input features

Return type:

Expression

__init__(feature_left, feature_right, N)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.ops.TResample(feature, freq, func)
__init__(feature, freq, func)

Resampling the data to target frequency. The resample function of pandas is used.

  • the timestamp will be at the start of the time span after resample.
Parameters:
  • feature (Expression) – An expression for calculating the feature
  • freq (str) – It will be passed into the resample method for resampling basedn on given frequency
  • func (method) – The method to get the resampled values Some expression are high frequently used
class qlib.data.ops.OpsWrapper

Ops Wrapper

__init__()

Initialize self. See help(type(self)) for accurate signature.

register(ops_list: List[Union[Type[qlib.data.base.ExpressionOps], dict]])

register operator

Parameters:ops_list (List[Union[Type[ExpressionOps], dict]]) –
  • if type(ops_list) is List[Type[ExpressionOps]], each element of ops_list represents the operator class, which should be the subclass of ExpressionOps.
  • if type(ops_list) is List[dict], each element of ops_list represents the config of operator, which has the following format:
    {
        "class": class_name,
        "module_path": path,
    }
    

    Note: class should be the class name of operator, module_path should be a python module or path of file.

qlib.data.ops.register_all_ops(C)

register all operator

Cache

class qlib.data.cache.MemCacheUnit(*args, **kwargs)

Memory Cache Unit.

__init__(*args, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

limited

whether memory cache is limited

class qlib.data.cache.MemCache(mem_cache_size_limit=None, limit_type='length')

Memory cache.

__init__(mem_cache_size_limit=None, limit_type='length')
Parameters:
  • mem_cache_size_limit – cache max size.
  • limit_type – length or sizeof; length(call fun: len), size(call fun: sys.getsizeof).
class qlib.data.cache.ExpressionCache(provider)

Expression cache mechanism base class.

This class is used to wrap expression provider with self-defined expression cache mechanism.

Note

Override the _uri and _expression method to create your own expression cache mechanism.

expression(instrument, field, start_time, end_time, freq)

Get expression data.

Note

Same interface as expression method in expression provider

update(cache_uri: Union[str, pathlib.Path], freq: str = 'day')

Update expression cache to latest calendar.

Override this method to define how to update expression cache corresponding to users’ own cache mechanism.

Parameters:
  • cache_uri (str or Path) – the complete uri of expression cache file (include dir path).
  • freq (str) –
Returns:

0(successful update)/ 1(no need to update)/ 2(update failure).

Return type:

int

class qlib.data.cache.DatasetCache(provider)

Dataset cache mechanism base class.

This class is used to wrap dataset provider with self-defined dataset cache mechanism.

Note

Override the _uri and _dataset method to create your own dataset cache mechanism.

dataset(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=1, inst_processors=[])

Get feature dataset.

Note

Same interface as dataset method in dataset provider

Note

The server use redis_lock to make sure read-write conflicts will not be triggered but client readers are not considered.

update(cache_uri: Union[str, pathlib.Path], freq: str = 'day')

Update dataset cache to latest calendar.

Override this method to define how to update dataset cache corresponding to users’ own cache mechanism.

Parameters:
  • cache_uri (str or Path) – the complete uri of dataset cache file (include dir path).
  • freq (str) –
Returns:

0(successful update)/ 1(no need to update)/ 2(update failure)

Return type:

int

static cache_to_origin_data(data, fields)

cache data to origin data

Parameters:
  • data – pd.DataFrame, cache data.
  • fields – feature fields.
Returns:

pd.DataFrame.

static normalize_uri_args(instruments, fields, freq)

normalize uri args

class qlib.data.cache.DiskExpressionCache(provider, **kwargs)

Prepared cache mechanism for server.

__init__(provider, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

gen_expression_cache(expression_data, cache_path, instrument, field, freq, last_update)

use bin file to save like feature-data.

update(sid, cache_uri, freq: str = 'day')

Update expression cache to latest calendar.

Override this method to define how to update expression cache corresponding to users’ own cache mechanism.

Parameters:
  • cache_uri (str or Path) – the complete uri of expression cache file (include dir path).
  • freq (str) –
Returns:

0(successful update)/ 1(no need to update)/ 2(update failure).

Return type:

int

class qlib.data.cache.DiskDatasetCache(provider, **kwargs)

Prepared cache mechanism for server.

__init__(provider, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

classmethod read_data_from_cache(cache_path: Union[str, pathlib.Path], start_time, end_time, fields)

read_cache_from

This function can read data from the disk cache dataset

Parameters:
  • cache_path
  • start_time
  • end_time
  • fields – The fields order of the dataset cache is sorted. So rearrange the columns to make it consistent.
Returns:

class IndexManager(cache_path: Union[str, pathlib.Path])

The lock is not considered in the class. Please consider the lock outside the code. This class is the proxy of the disk data.

__init__(cache_path: Union[str, pathlib.Path])

Initialize self. See help(type(self)) for accurate signature.

gen_dataset_cache(cache_path: Union[str, pathlib.Path], instruments, fields, freq, inst_processors=[])

Note

This function does not consider the cache read write lock. Please acquire the lock outside this function

The format the cache contains 3 parts(followed by typical filename).

  • index : cache/d41366901e25de3ec47297f12e2ba11d.index

    • The content of the file may be in following format(pandas.Series)

                          start end
      1999-11-10 00:00:00     0   1
      1999-11-11 00:00:00     1   2
      1999-11-12 00:00:00     2   3
      ...
      

      Note

      The start is closed. The end is open!!!!!

    • Each line contains two element <start_index, end_index> with a timestamp as its index.

    • It indicates the start_index (included) and end_index (excluded) of the data for timestamp

  • meta data: cache/d41366901e25de3ec47297f12e2ba11d.meta

  • data : cache/d41366901e25de3ec47297f12e2ba11d

    • This is a hdf file sorted by datetime
Parameters:
  • cache_path – The path to store the cache.
  • instruments – The instruments to store the cache.
  • fields – The fields to store the cache.
  • freq – The freq to store the cache.
  • inst_processors – Instrument processors.

:return type pd.DataFrame; The fields of the returned DataFrame are consistent with the parameters of the function.

update(cache_uri, freq: str = 'day')

Update dataset cache to latest calendar.

Override this method to define how to update dataset cache corresponding to users’ own cache mechanism.

Parameters:
  • cache_uri (str or Path) – the complete uri of dataset cache file (include dir path).
  • freq (str) –
Returns:

0(successful update)/ 1(no need to update)/ 2(update failure)

Return type:

int

Storage

class qlib.data.storage.storage.BaseStorage
class qlib.data.storage.storage.CalendarStorage(freq: str, future: bool, **kwargs)

The behavior of CalendarStorage’s methods and List’s methods of the same name remain consistent

__init__(freq: str, future: bool, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

data

get all data

Raises:ValueError – If the data(storage) does not exist, raise ValueError
index(value: str) → int
Raises:ValueError – If the data(storage) does not exist, raise ValueError
class qlib.data.storage.storage.InstrumentStorage(market: str, freq: str, **kwargs)
__init__(market: str, freq: str, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

data

get all data

Raises:ValueError – If the data(storage) does not exist, raise ValueError
update([E, ]**F) → None. Update D from mapping/iterable E and F.

Notes

If E present and has a .keys() method, does: for k in E: D[k] = E[k]

If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v

In either case, this is followed by: for k, v in F.items(): D[k] = v

class qlib.data.storage.storage.FeatureStorage(instrument: str, field: str, freq: str, **kwargs)
__init__(instrument: str, field: str, freq: str, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

data

get all data

Notes

if data(storage) does not exist, return empty pd.Series: return pd.Series(dtype=np.float32)

start_index

get FeatureStorage start index

Notes

If the data(storage) does not exist, return None

end_index

get FeatureStorage end index

Notes

The right index of the data range (both sides are closed)

The next data appending point will be end_index + 1

If the data(storage) does not exist, return None

write(data_array: Union[List[T], numpy.ndarray, Tuple], index: int = None)

Write data_array to FeatureStorage starting from index.

Notes

If index is None, append data_array to feature.

If len(data_array) == 0; return

If (index - self.end_index) >= 1, self[end_index+1: index] will be filled with np.nan

Examples

rebase(start_index: int = None, end_index: int = None)

Rebase the start_index and end_index of the FeatureStorage.

start_index and end_index are closed intervals: [start_index, end_index]

Examples

rewrite(data: Union[List[T], numpy.ndarray, Tuple], index: int)

overwrite all data in FeatureStorage with data

Parameters:
  • data (Union[List, np.ndarray, Tuple]) – data
  • index (int) – data start index
class qlib.data.storage.file_storage.FileStorageMixin

FileStorageMixin, applicable to FileXXXStorage Subclasses need to have provider_uri, freq, storage_name, file_name attributes

check()

check self.uri

Raises:ValueError
class qlib.data.storage.file_storage.FileCalendarStorage(freq: str, future: bool, provider_uri: dict = None, **kwargs)
__init__(freq: str, future: bool, provider_uri: dict = None, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

data

get all data

Raises:ValueError – If the data(storage) does not exist, raise ValueError
index(value: str) → int
Raises:ValueError – If the data(storage) does not exist, raise ValueError
class qlib.data.storage.file_storage.FileInstrumentStorage(market: str, freq: str, provider_uri: dict = None, **kwargs)
__init__(market: str, freq: str, provider_uri: dict = None, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

data

get all data

Raises:ValueError – If the data(storage) does not exist, raise ValueError
update([E, ]**F) → None. Update D from mapping/iterable E and F.

Notes

If E present and has a .keys() method, does: for k in E: D[k] = E[k]

If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v

In either case, this is followed by: for k, v in F.items(): D[k] = v

class qlib.data.storage.file_storage.FileFeatureStorage(instrument: str, field: str, freq: str, provider_uri: dict = None, **kwargs)
__init__(instrument: str, field: str, freq: str, provider_uri: dict = None, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

data

get all data

Notes

if data(storage) does not exist, return empty pd.Series: return pd.Series(dtype=np.float32)

write(data_array: Union[List[T], numpy.ndarray], index: int = None) → None

Write data_array to FeatureStorage starting from index.

Notes

If index is None, append data_array to feature.

If len(data_array) == 0; return

If (index - self.end_index) >= 1, self[end_index+1: index] will be filled with np.nan

Examples

start_index

get FeatureStorage start index

Notes

If the data(storage) does not exist, return None

end_index

get FeatureStorage end index

Notes

The right index of the data range (both sides are closed)

The next data appending point will be end_index + 1

If the data(storage) does not exist, return None

Dataset

Dataset Class

class qlib.data.dataset.__init__.Dataset(**kwargs)

Preparing data for model training and inferencing.

__init__(**kwargs)

init is designed to finish following steps:

  • init the sub instance and the state of the dataset(info to prepare the data)
    • The name of essential state for preparing data should not start with ‘_’ so that it could be serialized on disk when serializing.
  • setup data
    • The data related attributes’ names should start with ‘_’ so that it will not be saved on disk when serializing.

The data could specify the info to calculate the essential data for preparation

config(**kwargs)

config is designed to configure and parameters that cannot be learned from the data

setup_data(**kwargs)

Setup the data.

We split the setup_data function for following situation:

  • User have a Dataset object with learned status on disk.
  • User load the Dataset object from the disk.
  • User call setup_data to load new data.
  • User prepare data for model based on previous status.
prepare(**kwargs) → object

The type of dataset depends on the model. (It could be pd.DataFrame, pytorch.DataLoader, etc.) The parameters should specify the scope for the prepared data The method should: - process the data

  • return the processed data
Returns:return the object
Return type:object
class qlib.data.dataset.__init__.DatasetH(handler: Union[Dict[KT, VT], qlib.data.dataset.handler.DataHandler], segments: Dict[str, Tuple], fetch_kwargs: Dict[KT, VT] = {}, **kwargs)

Dataset with Data(H)andler

User should try to put the data preprocessing functions into handler. Only following data processing functions should be placed in Dataset:

  • The processing is related to specific model.
  • The processing is related to data split.
__init__(handler: Union[Dict[KT, VT], qlib.data.dataset.handler.DataHandler], segments: Dict[str, Tuple], fetch_kwargs: Dict[KT, VT] = {}, **kwargs)

Setup the underlying data.

Parameters:
  • handler (Union[dict, DataHandler]) –

    handler could be:

    • instance of DataHandler
    • config of DataHandler. Please refer to DataHandler
  • segments (dict) – Describe the options to segment the data. Here are some examples:
config(handler_kwargs: dict = None, **kwargs)

Initialize the DatasetH

Parameters:
  • handler_kwargs (dict) –

    Config of DataHandler, which could include the following arguments:

    • arguments of DataHandler.conf_data, such as ‘instruments’, ‘start_time’ and ‘end_time’.
  • kwargs (dict) –

    Config of DatasetH, such as

    • segments : dict
      Config of segments which is same as ‘segments’ in self.__init__
setup_data(handler_kwargs: dict = None, **kwargs)

Setup the Data

Parameters:handler_kwargs (dict) –

init arguments of DataHandler, which could include the following arguments:

  • init_type : Init Type of Handler
  • enable_cache : whether to enable cache
prepare(segments: Union[List[str], Tuple[str], str, slice, pandas.core.indexes.base.Index], col_set='__all', data_key='infer', **kwargs) → Union[List[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame]

Prepare the data for learning and inference.

Parameters:
  • segments (Union[List[Text], Tuple[Text], Text, slice]) –

    Describe the scope of the data to be prepared Here are some examples:

    • ’train’
    • [‘train’, ‘valid’]
  • col_set (str) –

    The col_set will be passed to self.handler when fetching data. TODO: make it automatic:

    • select DK_I for test data
    • select DK_L for training data.
  • data_key (str) – The data to fetch: DK_* Default is DK_I, which indicate fetching data for inference.
  • kwargs
    The parameters that kwargs may contain:
    flt_col : str
    It only exists in TSDatasetH, can be used to add a column of data(True or False) to filter data. This parameter is only supported when it is an instance of TSDatasetH.
Returns:

Return type:

Union[List[pd.DataFrame], pd.DataFrame]

Raises:

NotImplementedError:

Data Loader

class qlib.data.dataset.loader.DataLoader

DataLoader is designed for loading raw data from original data source.

load(instruments, start_time=None, end_time=None) → pandas.core.frame.DataFrame

load the data as pd.DataFrame.

Example of the data (The multi-index of the columns is optional.):

                        feature                                                             label
                        $close     $volume     Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime    instrument
2010-01-04  SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
            SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
            SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289
Parameters:
  • instruments (str or dict) – it can either be the market name or the config file of instruments generated by InstrumentProvider.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
Returns:

data load from the under layer source

Return type:

pd.DataFrame

class qlib.data.dataset.loader.DLWParser(config: Union[list, tuple, dict])

(D)ata(L)oader (W)ith (P)arser for features and names

Extracting this class so that QlibDataLoader and other dataloaders(such as QdbDataLoader) can share the fields.

__init__(config: Union[list, tuple, dict])
Parameters:config (Union[list, tuple, dict]) – Config will be used to describe the fields and column names
load_group_df(instruments, exprs: list, names: list, start_time: Union[str, pandas._libs.tslibs.timestamps.Timestamp] = None, end_time: Union[str, pandas._libs.tslibs.timestamps.Timestamp] = None, gp_name: str = None) → pandas.core.frame.DataFrame

load the dataframe for specific group

Parameters:
  • instruments – the instruments.
  • exprs (list) – the expressions to describe the content of the data.
  • names (list) – the name of the data.
Returns:

the queried dataframe.

Return type:

pd.DataFrame

load(instruments=None, start_time=None, end_time=None) → pandas.core.frame.DataFrame

load the data as pd.DataFrame.

Example of the data (The multi-index of the columns is optional.):

                        feature                                                             label
                        $close     $volume     Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime    instrument
2010-01-04  SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
            SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
            SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289
Parameters:
  • instruments (str or dict) – it can either be the market name or the config file of instruments generated by InstrumentProvider.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
Returns:

data load from the under layer source

Return type:

pd.DataFrame

class qlib.data.dataset.loader.QlibDataLoader(config: Tuple[list, tuple, dict], filter_pipe: List[T] = None, swap_level: bool = True, freq: Union[str, dict] = 'day', inst_processors: Union[dict, list] = None)

Same as QlibDataLoader. The fields can be define by config

__init__(config: Tuple[list, tuple, dict], filter_pipe: List[T] = None, swap_level: bool = True, freq: Union[str, dict] = 'day', inst_processors: Union[dict, list] = None)
Parameters:
  • config (Tuple[list, tuple, dict]) – Please refer to the doc of DLWParser
  • filter_pipe – Filter pipe for the instruments
  • swap_level – Whether to swap level of MultiIndex
  • freq (dict or str) – If type(config) == dict and type(freq) == str, load config data using freq. If type(config) == dict and type(freq) == dict, load config[<group_name>] data using freq[<group_name>]
  • inst_processors (dict | list) – If inst_processors is not None and type(config) == dict; load config[<group_name>] data using inst_processors[<group_name>] If inst_processors is a list, then it will be applied to all groups.
load_group_df(instruments, exprs: list, names: list, start_time: Union[str, pandas._libs.tslibs.timestamps.Timestamp] = None, end_time: Union[str, pandas._libs.tslibs.timestamps.Timestamp] = None, gp_name: str = None) → pandas.core.frame.DataFrame

load the dataframe for specific group

Parameters:
  • instruments – the instruments.
  • exprs (list) – the expressions to describe the content of the data.
  • names (list) – the name of the data.
Returns:

the queried dataframe.

Return type:

pd.DataFrame

class qlib.data.dataset.loader.StaticDataLoader(config: Union[dict, str, pandas.core.frame.DataFrame], join='outer')

DataLoader that supports loading data from file or as provided.

__init__(config: Union[dict, str, pandas.core.frame.DataFrame], join='outer')
Parameters:
  • config (dict) – {fields_group: <path or object>}
  • join (str) – How to align different dataframes
load(instruments=None, start_time=None, end_time=None) → pandas.core.frame.DataFrame

load the data as pd.DataFrame.

Example of the data (The multi-index of the columns is optional.):

                        feature                                                             label
                        $close     $volume     Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime    instrument
2010-01-04  SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
            SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
            SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289
Parameters:
  • instruments (str or dict) – it can either be the market name or the config file of instruments generated by InstrumentProvider.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
Returns:

data load from the under layer source

Return type:

pd.DataFrame

class qlib.data.dataset.loader.DataLoaderDH(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)

DataLoader based on (D)ata (H)andler It is designed to load multiple data from data handler - If you just want to load data from single datahandler, you can write them in single data handler

TODO: What make this module not that easy to use.

  • For online scenario

    • The underlayer data handler should be configured. But data loader doesn’t provide such interface & hook.
__init__(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)
Parameters:
  • handler_config (dict) – handler_config will be used to describe the handlers
  • fetch_kwargs (dict) – fetch_kwargs will be used to describe the different arguments of fetch method, such as col_set, squeeze, data_key, etc.
  • is_group (bool) – is_group will be used to describe whether the key of handler_config is group
load(instruments=None, start_time=None, end_time=None) → pandas.core.frame.DataFrame

load the data as pd.DataFrame.

Example of the data (The multi-index of the columns is optional.):

                        feature                                                             label
                        $close     $volume     Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime    instrument
2010-01-04  SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
            SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
            SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289
Parameters:
  • instruments (str or dict) – it can either be the market name or the config file of instruments generated by InstrumentProvider.
  • start_time (str) – start of the time range.
  • end_time (str) – end of the time range.
Returns:

data load from the under layer source

Return type:

pd.DataFrame

Data Handler

class qlib.data.dataset.handler.DataHandler(instruments=None, start_time=None, end_time=None, data_loader: Union[dict, str, qlib.data.dataset.loader.DataLoader] = None, init_data=True, fetch_orig=True)

The steps to using a handler 1. initialized data handler (call by init). 2. use the data.

The data handler try to maintain a handler with 2 level. datetime & instruments.

Any order of the index level can be supported (The order will be implied in the data). The order <datetime, instruments> will be used when the dataframe index name is missed.

Example of the data: The multi-index of the columns is optional.

                        feature                                                            label
                        $close     $volume  Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime   instrument
2010-01-04 SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
           SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
           SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289

Tips for improving the performance of datahandler - Fetching data with col_set=CS_RAW will return the raw data and may avoid pandas from copying the data when calling loc

__init__(instruments=None, start_time=None, end_time=None, data_loader: Union[dict, str, qlib.data.dataset.loader.DataLoader] = None, init_data=True, fetch_orig=True)
Parameters:
  • instruments – The stock list to retrieve.
  • start_time – start_time of the original data.
  • end_time – end_time of the original data.
  • data_loader (Union[dict, str, DataLoader]) – data loader to load the data.
  • init_data – initialize the original data in the constructor.
  • fetch_orig (bool) – Return the original data instead of copy if possible.
config(**kwargs)

configuration of data. # what data to be loaded from data source

This method will be used when loading pickled handler from dataset. The data will be initialized with different time range.

setup_data(enable_cache: bool = False)

Set Up the data in case of running initialization for multiple time

It is responsible for maintaining following variable 1) self._data

Parameters:enable_cache (bool) –

default value is false:

  • if enable_cache == True:
    the processed data will be saved on disk, and handler will load the cached data from the disk directly when we call init next time
fetch(selector: Union[pandas._libs.tslibs.timestamps.Timestamp, slice, str, pandas.core.indexes.base.Index] = slice(None, None, None), level: Union[str, int] = 'datetime', col_set: Union[str, List[str]] = '__all', squeeze: bool = False, proc_func: Callable = None) → pandas.core.frame.DataFrame

fetch data from underlying data source

Design motivation: - providing a unified interface for underlying data. - Potential to make the interface more friendly. - User can improve performance when fetching data in this extra layer

Parameters:
  • selector (Union[pd.Timestamp, slice, str]) –

    describe how to select data by index It can be categories as following

    • fetch single index
    • fetch a range of index
      • a slice range
      • pd.Index for specific indexes

    Following conflicts may occur

    • Does [“20200101”, “20210101”] mean selecting this slice or these two days?
      • slice have higher priorities
  • level (Union[str, int]) – which index level to select the data
  • col_set (Union[str, List[str]]) –
    • if isinstance(col_set, str):
      select a set of meaningful, pd.Index columns.(e.g. features, columns)
      • if col_set == CS_RAW:
        the raw dataset will be returned.
    • if isinstance(col_set, List[str]):
      select several sets of meaningful columns, the returned data has multiple levels
  • proc_func (Callable) –
    • Give a hook for processing data before fetching
    • An example to explain the necessity of the hook:
      • A Dataset learned some processors to process data which is related to data segmentation
      • It will apply them every time when preparing data.
      • The learned processor require the dataframe remains the same format when fitting and applying
      • However the data format will change according to the parameters.
      • So the processors should be applied to the underlayer data.
  • squeeze (bool) – whether squeeze columns and index
Returns:

Return type:

pd.DataFrame.

get_cols(col_set='__all') → list

get the column names

Parameters:col_set (str) – select a set of meaningful columns.(e.g. features, columns)
Returns:list of column names
Return type:list
get_range_selector(cur_date: Union[pandas._libs.tslibs.timestamps.Timestamp, str], periods: int) → slice

get range selector by number of periods

Parameters:
  • cur_date (pd.Timestamp or str) – current date
  • periods (int) – number of periods
get_range_iterator(periods: int, min_periods: Optional[int] = None, **kwargs) → Iterator[Tuple[pandas._libs.tslibs.timestamps.Timestamp, pandas.core.frame.DataFrame]]

get an iterator of sliced data with given periods

Parameters:
  • periods (int) – number of periods.
  • min_periods (int) – minimum periods for sliced dataframe.
  • kwargs (dict) – will be passed to self.fetch.
class qlib.data.dataset.handler.DataHandlerLP(instruments=None, start_time=None, end_time=None, data_loader: Union[dict, str, qlib.data.dataset.loader.DataLoader] = None, infer_processors: List[T] = [], learn_processors: List[T] = [], shared_processors: List[T] = [], process_type='append', drop_raw=False, **kwargs)

DataHandler with (L)earnable (P)rocessor

This handler will produce three pieces of data in pd.DataFrame format.

  • DK_R / self._data: the raw data loaded from the loader
  • DK_I / self._infer: the data processed for inference
  • DK_L / self._learn: the data processed for learning model.

The motivation of using different processor workflows for learning and inference Here are some examples.

  • The instrument universe for learning and inference may be different.

  • The processing of some samples may rely on label (for example, some samples hit the limit may need extra processing or be dropped).

    • These processors only apply to the learning phase.

Tips for data handler

  • To reduce the memory cost

    • drop_raw=True: this will modify the data inplace on raw data;
  • Please note processed data like self._infer or self._learn are concepts different from segments in Qlib’s Dataset like “train” and “test”

    • Processed data like self._infer or self._learn are underlying data processed with different processors
    • segments in Qlib’s Dataset like “train” and “test” are simply the time segmentations when querying data(“train” are often before “test” in time-series).
    • For example, you can query data._infer processed by infer_processors in the “train” time segmentation.
__init__(instruments=None, start_time=None, end_time=None, data_loader: Union[dict, str, qlib.data.dataset.loader.DataLoader] = None, infer_processors: List[T] = [], learn_processors: List[T] = [], shared_processors: List[T] = [], process_type='append', drop_raw=False, **kwargs)
Parameters:
  • infer_processors (list) –
    • list of <description info> of processors to generate data for inference
    • example of <description info>:
  • learn_processors (list) – similar to infer_processors, but for generating data for learning models
  • process_type (str) –

    PTYPE_I = ‘independent’

    • self._infer will be processed by infer_processors
    • self._learn will be processed by learn_processors

    PTYPE_A = ‘append’

    • self._infer will be processed by infer_processors
    • self._learn will be processed by infer_processors + learn_processors
      • (e.g. self._infer processed by learn_processors )
  • drop_raw (bool) – Whether to drop the raw data
fit()

fit data without processing the data

fit_process_data()

fit and process data

The input of the fit will be the output of the previous processor

process_data(with_fit: bool = False)

process_data data. Fun processor.fit if necessary

Notation: (data) [processor]

# data processing flow of self.process_type == DataHandlerLP.PTYPE_I

(self._data)-[shared_processors]-(_shared_df)-[learn_processors]-(_learn_df)
                                       \
                                        -[infer_processors]-(_infer_df)

# data processing flow of self.process_type == DataHandlerLP.PTYPE_A

(self._data)-[shared_processors]-(_shared_df)-[infer_processors]-(_infer_df)-[learn_processors]-(_learn_df)
Parameters:with_fit (bool) – The input of the fit will be the output of the previous processor
config(processor_kwargs: dict = None, **kwargs)

configuration of data. # what data to be loaded from data source

This method will be used when loading pickled handler from dataset. The data will be initialized with different time range.

setup_data(init_type: str = 'fit_seq', **kwargs)

Set up the data in case of running initialization for multiple time

Parameters:
  • init_type (str) – The type IT_* listed above.
  • enable_cache (bool) –

    default value is false:

    • if enable_cache == True:
      the processed data will be saved on disk, and handler will load the cached data from the disk directly when we call init next time
fetch(selector: Union[pandas._libs.tslibs.timestamps.Timestamp, slice, str] = slice(None, None, None), level: Union[str, int] = 'datetime', col_set='__all', data_key: typing_extensions.Literal['raw', 'infer', 'learn'][raw, infer, learn] = 'infer', squeeze: bool = False, proc_func: Callable = None) → pandas.core.frame.DataFrame

fetch data from underlying data source

Parameters:
  • selector (Union[pd.Timestamp, slice, str]) – describe how to select data by index.
  • level (Union[str, int]) – which index level to select the data.
  • col_set (str) – select a set of meaningful columns.(e.g. features, columns).
  • data_key (str) – the data to fetch: DK_*.
  • proc_func (Callable) – please refer to the doc of DataHandler.fetch
Returns:

Return type:

pd.DataFrame

get_cols(col_set='__all', data_key: typing_extensions.Literal['raw', 'infer', 'learn'][raw, infer, learn] = 'infer') → list

get the column names

Parameters:
  • col_set (str) – select a set of meaningful columns.(e.g. features, columns).
  • data_key (DATA_KEY_TYPE) – the data to fetch: DK_*.
Returns:

list of column names

Return type:

list

classmethod cast(handler: qlib.data.dataset.handler.DataHandlerLP) → qlib.data.dataset.handler.DataHandlerLP

Motivation

  • A user creates a datahandler in his customized package. Then he wants to share the processed handler to other users without introduce the package dependency and complicated data processing logic.
  • This class make it possible by casting the class to DataHandlerLP and only keep the processed data
Parameters:handler (DataHandlerLP) – A subclass of DataHandlerLP
Returns:the converted processed data
Return type:DataHandlerLP
classmethod from_df(df: pandas.core.frame.DataFrame) → qlib.data.dataset.handler.DataHandlerLP

Motivation: - When user want to get a quick data handler.

The created data handler will have only one shared Dataframe without processors. After creating the handler, user may often want to dump the handler for reuse Here is a typical use case

from qlib.data.dataset import DataHandlerLP
dh = DataHandlerLP.from_df(df)
dh.to_pickle(fname, dump_all=True)

TODO: - The StaticDataLoader is quite slow. It don’t have to copy the data again…

Processor

qlib.data.dataset.processor.get_group_columns(df: pandas.core.frame.DataFrame, group: Optional[str])

get a group of columns from multi-index columns DataFrame

Parameters:
  • df (pd.DataFrame) – with multi of columns.
  • group (str) – the name of the feature group, i.e. the first level value of the group index.
class qlib.data.dataset.processor.Processor
fit(df: pandas.core.frame.DataFrame = None)

learn data processing parameters

Parameters:df (pd.DataFrame) – When we fit and process data with processor one by one. The fit function reiles on the output of previous processor, i.e. df.
is_for_infer() → bool

Is this processor usable for inference Some processors are not usable for inference.

Returns:if it is usable for infenrece.
Return type:bool
readonly() → bool

Does the processor treat the input data readonly (i.e. does not write the input data) when processing

Knowning the readonly information is helpful to the Handler to avoid uncessary copy

config(**kwargs)

configure the serializable object

Parameters:
  • may include following keys (kwargs) –
    dump_all : bool
    will the object dump all object
    exclude : list
    What attribute will not be dumped
    include : list
    What attribute will be dumped
  • recursive (bool) – will the configuration be recursive
class qlib.data.dataset.processor.DropnaProcessor(fields_group=None)
__init__(fields_group=None)

Initialize self. See help(type(self)) for accurate signature.

readonly()

Does the processor treat the input data readonly (i.e. does not write the input data) when processing

Knowning the readonly information is helpful to the Handler to avoid uncessary copy

class qlib.data.dataset.processor.DropnaLabel(fields_group='label')
__init__(fields_group='label')

Initialize self. See help(type(self)) for accurate signature.

is_for_infer() → bool

The samples are dropped according to label. So it is not usable for inference

class qlib.data.dataset.processor.DropCol(col_list=[])
__init__(col_list=[])

Initialize self. See help(type(self)) for accurate signature.

readonly()

Does the processor treat the input data readonly (i.e. does not write the input data) when processing

Knowning the readonly information is helpful to the Handler to avoid uncessary copy

class qlib.data.dataset.processor.FilterCol(fields_group='feature', col_list=[])
__init__(fields_group='feature', col_list=[])

Initialize self. See help(type(self)) for accurate signature.

readonly()

Does the processor treat the input data readonly (i.e. does not write the input data) when processing

Knowning the readonly information is helpful to the Handler to avoid uncessary copy

class qlib.data.dataset.processor.TanhProcess

Use tanh to process noise data

class qlib.data.dataset.processor.ProcessInf

Process infinity

class qlib.data.dataset.processor.Fillna(fields_group=None, fill_value=0)

Process NaN

__init__(fields_group=None, fill_value=0)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.dataset.processor.MinMaxNorm(fit_start_time, fit_end_time, fields_group=None)
__init__(fit_start_time, fit_end_time, fields_group=None)

Initialize self. See help(type(self)) for accurate signature.

fit(df: pandas.core.frame.DataFrame = None)

learn data processing parameters

Parameters:df (pd.DataFrame) – When we fit and process data with processor one by one. The fit function reiles on the output of previous processor, i.e. df.
class qlib.data.dataset.processor.ZScoreNorm(fit_start_time, fit_end_time, fields_group=None)

ZScore Normalization

__init__(fit_start_time, fit_end_time, fields_group=None)

Initialize self. See help(type(self)) for accurate signature.

fit(df: pandas.core.frame.DataFrame = None)

learn data processing parameters

Parameters:df (pd.DataFrame) – When we fit and process data with processor one by one. The fit function reiles on the output of previous processor, i.e. df.
class qlib.data.dataset.processor.RobustZScoreNorm(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)

Robust ZScore Normalization

Use robust statistics for Z-Score normalization:
mean(x) = median(x) std(x) = MAD(x) * 1.4826
Reference:
https://en.wikipedia.org/wiki/Median_absolute_deviation.
__init__(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)

Initialize self. See help(type(self)) for accurate signature.

fit(df: pandas.core.frame.DataFrame = None)

learn data processing parameters

Parameters:df (pd.DataFrame) – When we fit and process data with processor one by one. The fit function reiles on the output of previous processor, i.e. df.
class qlib.data.dataset.processor.CSZScoreNorm(fields_group=None, method='zscore')

Cross Sectional ZScore Normalization

__init__(fields_group=None, method='zscore')

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.dataset.processor.CSRankNorm(fields_group=None)

Cross Sectional Rank Normalization. “Cross Sectional” is often used to describe data operations. The operations across different stocks are often called Cross Sectional Operation.

For example, CSRankNorm is an operation that grouping the data by each day and rank across all the stocks in each day.

Explanation about 3.46 & 0.5

import numpy as np
import pandas as pd
x = np.random.random(10000)  # for any variable
x_rank = pd.Series(x).rank(pct=True)  # if it is converted to rank, it will be a uniform distributed
x_rank_norm = (x_rank - x_rank.mean()) / x_rank.std()  # Normally, we will normalize it to make it like normal distribution

x_rank.mean()   # accounts for 0.5
1 / x_rank.std()  # accounts for 3.46
__init__(fields_group=None)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.dataset.processor.CSZFillna(fields_group=None)

Cross Sectional Fill Nan

__init__(fields_group=None)

Initialize self. See help(type(self)) for accurate signature.

class qlib.data.dataset.processor.HashStockFormat

Process the storage of from df into hasing stock format

class qlib.data.dataset.processor.TimeRangeFlt(start_time: Union[pandas._libs.tslibs.timestamps.Timestamp, str, None] = None, end_time: Union[pandas._libs.tslibs.timestamps.Timestamp, str, None] = None, freq: str = 'day')

This is a filter to filter stock. Only keep the data that exist from start_time to end_time (the existence in the middle is not checked.) WARNING: It may induce leakage!!!

__init__(start_time: Union[pandas._libs.tslibs.timestamps.Timestamp, str, None] = None, end_time: Union[pandas._libs.tslibs.timestamps.Timestamp, str, None] = None, freq: str = 'day')
Parameters:
  • start_time (Optional[Union[pd.Timestamp, str]]) – The data must start earlier (or equal) than start_time None indicates data will not be filtered based on start_time
  • end_time (Optional[Union[pd.Timestamp, str]]) – similar to start_time
  • freq (str) – The frequency of the calendar

Contrib

Model

class qlib.model.base.BaseModel

Modeling things

predict(*args, **kwargs) → object

Make predictions after modeling things

class qlib.model.base.Model

Learnable Models

fit(dataset: qlib.data.dataset.Dataset, reweighter: qlib.data.dataset.weight.Reweighter)

Learn model from the base model

Note

The attribute names of learned model should not start with ‘_’. So that the model could be dumped to disk.

The following code example shows how to retrieve x_train, y_train and w_train from the dataset:

# get features and labels
df_train, df_valid = dataset.prepare(
    ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L
)
x_train, y_train = df_train["feature"], df_train["label"]
x_valid, y_valid = df_valid["feature"], df_valid["label"]

# get weights
try:
    wdf_train, wdf_valid = dataset.prepare(["train", "valid"], col_set=["weight"],
                                           data_key=DataHandlerLP.DK_L)
    w_train, w_valid = wdf_train["weight"], wdf_valid["weight"]
except KeyError as e:
    w_train = pd.DataFrame(np.ones_like(y_train.values), index=y_train.index)
    w_valid = pd.DataFrame(np.ones_like(y_valid.values), index=y_valid.index)
Parameters:dataset (Dataset) – dataset will generate the processed data from model training.
predict(dataset: qlib.data.dataset.Dataset, segment: Union[str, slice] = 'test') → object

give prediction given Dataset

Parameters:
  • dataset (Dataset) – dataset will generate the processed dataset from model training.
  • segment (Text or slice) – dataset will use this segment to prepare data. (default=test)
Returns:

Return type:

Prediction results with certain type such as pandas.Series.

class qlib.model.base.ModelFT

Model (F)ine(t)unable

finetune(dataset: qlib.data.dataset.Dataset)

finetune model based given dataset

A typical use case of finetuning model with qlib.workflow.R

# start exp to train init model
with R.start(experiment_name="init models"):
    model.fit(dataset)
    R.save_objects(init_model=model)
    rid = R.get_recorder().id

# Finetune model based on previous trained model
with R.start(experiment_name="finetune model"):
    recorder = R.get_recorder(recorder_id=rid, experiment_name="init models")
    model = recorder.load_object("init_model")
    model.finetune(dataset, num_boost_round=10)
Parameters:dataset (Dataset) – dataset will generate the processed dataset from model training.

Strategy

class qlib.contrib.strategy.TopkDropoutStrategy(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
__init__(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
Parameters:
  • topk (int) – the number of stocks in the portfolio.
  • n_drop (int) – number of stocks to be replaced in each trading date.
  • method_sell (str) – dropout method_sell, random/bottom.
  • method_buy (str) – dropout method_buy, random/top.
  • hold_thresh (int) – minimum holding days before sell stock , will check current.get_stock_count(order.stock_id) >= self.hold_thresh.
  • only_tradable (bool) –

    will the strategy only consider the tradable stock when buying and selling.

    if only_tradable:

    strategy will make decision with the tradable state of the stock info and avoid buy and sell them.

    else:

    strategy will make buy sell decision without checking the tradable state of the stock.
  • forbid_all_trade_at_limit (bool) –

    if forbid all trades when limit_up or limit_down reached.

    if forbid_all_trade_at_limit:

    strategy will not do any trade when price reaches limit up/down, even not sell at limit up nor buy at limit down, though allowed in reality.

    else:

    strategy will sell at limit up and buy ad limit down.
generate_trade_decision(execute_result=None)

Generate trade decision in each trading bar

Parameters:execute_result (List[object], optional) –

the executed result for trade decision, by default None

  • When call the generate_trade_decision firstly, execute_result could be None
class qlib.contrib.strategy.WeightStrategyBase(*, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
__init__(*, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
signal :
the information to describe a signal. Please refer to the docs of qlib.backtest.signal.create_signal_from the decision of the strategy will base on the given signal
trade_exchange : Exchange

exchange that provides market info, used to deal order and generate report

  • If trade_exchange is None, self.trade_exchange will be set with common_infra

  • It allowes different trade_exchanges is used in different executions.

  • For example:

    • In daily execution, both daily exchange and minutely are usable, but the daily exchange is recommended because it runs faster.
    • In minutely execution, the daily exchange is not usable, only the minutely exchange is recommended.
generate_target_weight_position(score, current, trade_start_time, trade_end_time)

Generate target position from score for this date and the current position.The cash is not considered in the position

Parameters:
  • score (pd.Series) – pred score for this trade date, index is stock_id, contain ‘score’ column.
  • current (Position()) – current position.
  • trade_start_time (pd.Timestamp) –
  • trade_end_time (pd.Timestamp) –
generate_trade_decision(execute_result=None)

Generate trade decision in each trading bar

Parameters:execute_result (List[object], optional) –

the executed result for trade decision, by default None

  • When call the generate_trade_decision firstly, execute_result could be None
class qlib.contrib.strategy.EnhancedIndexingStrategy(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)

Enhanced Indexing Strategy

Enhanced indexing combines the arts of active management and passive management, with the aim of outperforming a benchmark index (e.g., S&P 500) in terms of portfolio return while controlling the risk exposure (a.k.a. tracking error).

Users need to prepare their risk model data like below:

├── /path/to/riskmodel
├──── 20210101
├────── factor_exp.{csv|pkl|h5}
├────── factor_cov.{csv|pkl|h5}
├────── specific_risk.{csv|pkl|h5}
├────── blacklist.{csv|pkl|h5}  # optional

The risk model data can be obtained from risk data provider. You can also use qlib.model.riskmodel.structured.StructuredCovEstimator to prepare these data.

Parameters:
  • riskmodel_path (str) – risk model path
  • name_mapping (dict) – alternative file names
__init__(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)
signal :
the information to describe a signal. Please refer to the docs of qlib.backtest.signal.create_signal_from the decision of the strategy will base on the given signal
trade_exchange : Exchange

exchange that provides market info, used to deal order and generate report

  • If trade_exchange is None, self.trade_exchange will be set with common_infra

  • It allowes different trade_exchanges is used in different executions.

  • For example:

    • In daily execution, both daily exchange and minutely are usable, but the daily exchange is recommended because it runs faster.
    • In minutely execution, the daily exchange is not usable, only the minutely exchange is recommended.
generate_target_weight_position(score, current, trade_start_time, trade_end_time)

Generate target position from score for this date and the current position.The cash is not considered in the position

Parameters:
  • score (pd.Series) – pred score for this trade date, index is stock_id, contain ‘score’ column.
  • current (Position()) – current position.
  • trade_start_time (pd.Timestamp) –
  • trade_end_time (pd.Timestamp) –
class qlib.contrib.strategy.TWAPStrategy(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)

TWAP Strategy for trading

Note

  • This TWAP strategy will celling round when trading. This will make the TWAP trading strategy produce the order earlier when the total trade unit of amount is less than the trading step
reset(outer_trade_decision: qlib.backtest.decision.BaseTradeDecision = None, **kwargs)
Parameters:outer_trade_decision (BaseTradeDecision, optional) –
generate_trade_decision(execute_result=None)

Generate trade decision in each trading bar

Parameters:execute_result (List[object], optional) –

the executed result for trade decision, by default None

  • When call the generate_trade_decision firstly, execute_result could be None
class qlib.contrib.strategy.SBBStrategyBase(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)

(S)elect the (B)etter one among every two adjacent trading (B)ars to sell or buy.

reset(outer_trade_decision: qlib.backtest.decision.BaseTradeDecision = None, **kwargs)
Parameters:outer_trade_decision (BaseTradeDecision, optional) –
generate_trade_decision(execute_result=None)

Generate trade decision in each trading bar

Parameters:execute_result (List[object], optional) –

the executed result for trade decision, by default None

  • When call the generate_trade_decision firstly, execute_result could be None
class qlib.contrib.strategy.SBBStrategyEMA(outer_trade_decision: qlib.backtest.decision.BaseTradeDecision = None, instruments: Union[List[T], str] = 'csi300', freq: str = 'day', trade_exchange: qlib.backtest.exchange.Exchange = None, level_infra: qlib.backtest.utils.LevelInfrastructure = None, common_infra: qlib.backtest.utils.CommonInfrastructure = None, **kwargs)

(S)elect the (B)etter one among every two adjacent trading (B)ars to sell or buy with (EMA) signal.

__init__(outer_trade_decision: qlib.backtest.decision.BaseTradeDecision = None, instruments: Union[List[T], str] = 'csi300', freq: str = 'day', trade_exchange: qlib.backtest.exchange.Exchange = None, level_infra: qlib.backtest.utils.LevelInfrastructure = None, common_infra: qlib.backtest.utils.CommonInfrastructure = None, **kwargs)
Parameters:
  • instruments (Union[List, str], optional) – instruments of EMA signal, by default “csi300”
  • freq (str, optional) – freq of EMA signal, by default “day” Note: freq may be different from time_per_step
reset_level_infra(level_infra)

reset level-shared infra - After reset the trade calendar, the signal will be changed

class qlib.contrib.strategy.SoftTopkStrategy(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
__init__(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
Parameters:
  • topk (int) – top-N stocks to buy
  • risk_degree (float) –

    position percentage of total value buy_method:

    rank_fill: assign the weight stocks that rank high first(1/topk max) average_fill: assign the weight to the stocks rank high averagely.
get_risk_degree(trade_step=None)

Return the proportion of your total value you will used in investment. Dynamically risk_degree will result in Market timing

generate_target_weight_position(score, current, trade_start_time, trade_end_time)
Parameters:
  • score – pred score for this trade date, pd.Series, index is stock_id, contain ‘score’ column
  • current – current position, use Position() class
  • trade_date

    trade date

    generate target position from score for this date and the current position

    The cache is not considered in the position

Evaluate

qlib.contrib.evaluate.risk_analysis(r, N: int = None, freq: str = 'day')

Risk Analysis NOTE: The calculation of annulaized return is different from the definition of annualized return. It is implemented by design. Qlib tries to cumulated returns by summation instead of production to avoid the cumulated curve being skewed exponentially. All the calculation of annualized returns follows this principle in Qlib.

TODO: add a parameter to enable calculating metrics with production accumulation of return.

Parameters:
  • r (pandas.Series) – daily return series.
  • N (int) – scaler for annualizing information_ratio (day: 252, week: 50, month: 12), at least one of N and freq should exist
  • freq (str) – analysis frequency used for calculating the scaler, at least one of N and freq should exist
qlib.contrib.evaluate.indicator_analysis(df, method='mean')

analyze statistical time-series indicators of trading

Parameters:
  • df (pandas.DataFrame) –
    columns: like [‘pa’, ‘pos’, ‘ffr’, ‘deal_amount’, ‘value’].
    Necessary fields:
    • ’pa’ is the price advantage in trade indicators
    • ’pos’ is the positive rate in trade indicators
    • ’ffr’ is the fulfill rate in trade indicators
    Optional fields:
    • ’deal_amount’ is the total deal deal_amount, only necessary when method is ‘amount_weighted’
    • ’value’ is the total trade value, only necessary when method is ‘value_weighted’

    index: Index(datetime)

  • method (str, optional) –

    statistics method of pa/ffr, by default “mean”

    • if method is ‘mean’, count the mean statistical value of each trade indicator
    • if method is ‘amount_weighted’, count the deal_amount weighted mean statistical value of each trade indicator
    • if method is ‘value_weighted’, count the value weighted mean statistical value of each trade indicator

    Note: statistics method of pos is always “mean”

Returns:

statistical value of each trade indicators

Return type:

pd.DataFrame

qlib.contrib.evaluate.backtest_daily(start_time: Union[str, pandas._libs.tslibs.timestamps.Timestamp], end_time: Union[str, pandas._libs.tslibs.timestamps.Timestamp], strategy: Union[str, dict, qlib.strategy.base.BaseStrategy], executor: Union[str, dict, qlib.backtest.executor.BaseExecutor] = None, account: Union[float, int, qlib.backtest.position.Position] = 100000000.0, benchmark: str = 'SH000300', exchange_kwargs: dict = None, pos_type: str = 'Position')

initialize the strategy and executor, then executor the backtest of daily frequency

Parameters:
  • start_time (Union[str, pd.Timestamp]) – closed start time for backtest NOTE: This will be applied to the outmost executor’s calendar.
  • end_time (Union[str, pd.Timestamp]) – closed end time for backtest NOTE: This will be applied to the outmost executor’s calendar. E.g. Executor[day](Executor[1min]), setting end_time == 20XX0301 will include all the minutes on 20XX0301
  • strategy (Union[str, dict, BaseStrategy]) –

    for initializing outermost portfolio strategy. Please refer to the docs of init_instance_by_config for more information.

    E.g.

    # dict
    strategy = {
        "class": "TopkDropoutStrategy",
        "module_path": "qlib.contrib.strategy.signal_strategy",
        "kwargs": {
            "signal": (model, dataset),
            "topk": 50,
            "n_drop": 5,
        },
    }
    # BaseStrategy
    pred_score = pd.read_pickle("score.pkl")["score"]
    STRATEGY_CONFIG = {
        "topk": 50,
        "n_drop": 5,
        "signal": pred_score,
    }
    strategy = TopkDropoutStrategy(**STRATEGY_CONFIG)
    # str example.
    # 1) specify a pickle object
    #     - path like 'file:///<path to pickle file>/obj.pkl'
    # 2) specify a class name
    #     - "ClassName":  getattr(module, "ClassName")() will be used.
    # 3) specify module path with class name
    #     - "a.b.c.ClassName" getattr(<a.b.c.module>, "ClassName")() will be used.
    
  • executor (Union[str, dict, BaseExecutor]) – for initializing the outermost executor.
  • benchmark (str) – the benchmark for reporting.
  • account (Union[float, int, Position]) –

    information for describing how to creating the account

    For float or int:

    Using Account with only initial cash

    For Position:

    Using Account with a Position
  • exchange_kwargs (dict) –

    the kwargs for initializing Exchange E.g.

    exchange_kwargs = {
        "freq": freq,
        "limit_threshold": None, # limit_threshold is None, using C.limit_threshold
        "deal_price": None, # deal_price is None, using C.deal_price
        "open_cost": 0.0005,
        "close_cost": 0.0015,
        "min_cost": 5,
    }
    
  • pos_type (str) – the type of Position.
Returns:

  • report_normal (pd.DataFrame) – backtest report
  • positions_normal (pd.DataFrame) – backtest positions

qlib.contrib.evaluate.long_short_backtest(pred, topk=50, deal_price=None, shift=1, open_cost=0, close_cost=0, trade_unit=None, limit_threshold=None, min_cost=5, subscribe_fields=[], extract_codes=False)

A backtest for long-short strategy

Parameters:
  • pred – The trading signal produced on day T.
  • topk – The short topk securities and long topk securities.
  • deal_price – The price to deal the trading.
  • shift – Whether to shift prediction by one day. The trading day will be T+1 if shift==1.
  • open_cost – open transaction cost.
  • close_cost – close transaction cost.
  • trade_unit – 100 for China A.
  • limit_threshold – limit move 0.1 (10%) for example, long and short with same limit.
  • min_cost – min transaction cost.
  • subscribe_fields – subscribe fields.
  • extract_codes – bool. will we pass the codes extracted from the pred to the exchange. NOTE: This will be faster with offline qlib.
Returns:

The result of backtest, it is represented by a dict. { “long”: long_returns(excess), “short”: short_returns(excess), “long_short”: long_short_returns}

Report

qlib.contrib.report.analysis_position.report.report_graph(report_df: pandas.core.frame.DataFrame, show_notebook: bool = True) → [<class 'list'>, <class 'tuple'>]

display backtest report

Example:

import qlib
import pandas as pd
from qlib.utils.time import Freq
from qlib.utils import flatten_dict
from qlib.backtest import backtest, executor
from qlib.contrib.evaluate import risk_analysis
from qlib.contrib.strategy import TopkDropoutStrategy

# init qlib
qlib.init(provider_uri=<qlib data dir>)

CSI300_BENCH = "SH000300"
FREQ = "day"
STRATEGY_CONFIG = {
    "topk": 50,
    "n_drop": 5,
    # pred_score, pd.Series
    "signal": pred_score,
}

EXECUTOR_CONFIG = {
    "time_per_step": "day",
    "generate_portfolio_metrics": True,
}

backtest_config = {
    "start_time": "2017-01-01",
    "end_time": "2020-08-01",
    "account": 100000000,
    "benchmark": CSI300_BENCH,
    "exchange_kwargs": {
        "freq": FREQ,
        "limit_threshold": 0.095,
        "deal_price": "close",
        "open_cost": 0.0005,
        "close_cost": 0.0015,
        "min_cost": 5,
    },
}

# strategy object
strategy_obj = TopkDropoutStrategy(**STRATEGY_CONFIG)
# executor object
executor_obj = executor.SimulatorExecutor(**EXECUTOR_CONFIG)
# backtest
portfolio_metric_dict, indicator_dict = backtest(executor=executor_obj, strategy=strategy_obj, **backtest_config)
analysis_freq = "{0}{1}".format(*Freq.parse(FREQ))
# backtest info
report_normal_df, positions_normal = portfolio_metric_dict.get(analysis_freq)

qcr.analysis_position.report_graph(report_normal_df)
Parameters:
  • report_df

    df.index.name must be date, df.columns must contain return, turnover, cost, bench.

                return      cost        bench       turnover
    date
    2017-01-04  0.003421    0.000864    0.011693    0.576325
    2017-01-05  0.000508    0.000447    0.000721    0.227882
    2017-01-06  -0.003321   0.000212    -0.004322   0.102765
    2017-01-09  0.006753    0.000212    0.006874    0.105864
    2017-01-10  -0.000416   0.000440    -0.003350   0.208396
    
  • show_notebook – whether to display graphics in notebook, the default is True.
Returns:

if show_notebook is True, display in notebook; else return plotly.graph_objs.Figure list.

qlib.contrib.report.analysis_position.score_ic.score_ic_graph(pred_label: pandas.core.frame.DataFrame, show_notebook: bool = True, **kwargs) → [<class 'list'>, <class 'tuple'>]

score IC

Example:

from qlib.data import D
from qlib.contrib.report import analysis_position
pred_df_dates = pred_df.index.get_level_values(level='datetime')
features_df = D.features(D.instruments('csi500'), ['Ref($close, -2)/Ref($close, -1)-1'], pred_df_dates.min(), pred_df_dates.max())
features_df.columns = ['label']
pred_label = pd.concat([features_df, pred], axis=1, sort=True).reindex(features_df.index)
analysis_position.score_ic_graph(pred_label)
Parameters:
  • pred_label

    index is pd.MultiIndex, index name is [instrument, datetime]; columns names is [score, label].

    instrument  datetime        score         label
    SH600004  2017-12-11     -0.013502       -0.013502
                2017-12-12   -0.072367       -0.072367
                2017-12-13   -0.068605       -0.068605
                2017-12-14    0.012440        0.012440
                2017-12-15   -0.102778       -0.102778
    
  • show_notebook – whether to display graphics in notebook, the default is True.
Returns:

if show_notebook is True, display in notebook; else return plotly.graph_objs.Figure list.

qlib.contrib.report.analysis_position.cumulative_return.cumulative_return_graph(position: dict, report_normal: pandas.core.frame.DataFrame, label_data: pandas.core.frame.DataFrame, show_notebook=True, start_date=None, end_date=None) → Iterable[plotly.graph_objs._figure.Figure]

Backtest buy, sell, and holding cumulative return graph

Example:

from qlib.data import D
from qlib.contrib.evaluate import risk_analysis, backtest, long_short_backtest
from qlib.contrib.strategy import TopkDropoutStrategy

# backtest parameters
bparas = {}
bparas['limit_threshold'] = 0.095
bparas['account'] = 1000000000

sparas = {}
sparas['topk'] = 50
sparas['n_drop'] = 5
strategy = TopkDropoutStrategy(**sparas)

report_normal_df, positions = backtest(pred_df, strategy, **bparas)

pred_df_dates = pred_df.index.get_level_values(level='datetime')
features_df = D.features(D.instruments('csi500'), ['Ref($close, -1)/$close - 1'], pred_df_dates.min(), pred_df_dates.max())
features_df.columns = ['label']

qcr.analysis_position.cumulative_return_graph(positions, report_normal_df, features_df)

Graph desc:

  • Axis X: Trading day.
  • Axis Y:
  • Above axis Y: (((Ref($close, -1)/$close - 1) * weight).sum() / weight.sum()).cumsum().
  • Below axis Y: Daily weight sum.
  • In the sell graph, y < 0 stands for profit; in other cases, y > 0 stands for profit.
  • In the buy_minus_sell graph, the y value of the weight graph at the bottom is buy_weight + sell_weight.
  • In each graph, the red line in the histogram on the right represents the average.
Parameters:
  • position – position data
  • report_normal
                    return      cost        bench       turnover
    date
    2017-01-04  0.003421    0.000864    0.011693    0.576325
    2017-01-05  0.000508    0.000447    0.000721    0.227882
    2017-01-06  -0.003321   0.000212    -0.004322   0.102765
    2017-01-09  0.006753    0.000212    0.006874    0.105864
    2017-01-10  -0.000416   0.000440    -0.003350   0.208396
    
  • label_data

    D.features result; index is pd.MultiIndex, index name is [instrument, datetime]; columns names is [label].

    The label T is the change from T to T+1, it is recommended to use close, example: D.features(D.instruments(‘csi500’), [‘Ref($close, -1)/$close-1’])

                                    label
    instrument  datetime
    SH600004        2017-12-11  -0.013502
                    2017-12-12  -0.072367
                    2017-12-13  -0.068605
                    2017-12-14  0.012440
                    2017-12-15  -0.102778
    
  • show_notebook – True or False. If True, show graph in notebook, else return figures
  • start_date – start date
  • end_date – end date
Returns:

qlib.contrib.report.analysis_position.risk_analysis.risk_analysis_graph(analysis_df: pandas.core.frame.DataFrame = None, report_normal_df: pandas.core.frame.DataFrame = None, report_long_short_df: pandas.core.frame.DataFrame = None, show_notebook: bool = True) → Iterable[plotly.graph_objs._figure.Figure]

Generate analysis graph and monthly analysis

Example:

import qlib
import pandas as pd
from qlib.utils.time import Freq
from qlib.utils import flatten_dict
from qlib.backtest import backtest, executor
from qlib.contrib.evaluate import risk_analysis
from qlib.contrib.strategy import TopkDropoutStrategy

# init qlib
qlib.init(provider_uri=<qlib data dir>)

CSI300_BENCH = "SH000300"
FREQ = "day"
STRATEGY_CONFIG = {
    "topk": 50,
    "n_drop": 5,
    # pred_score, pd.Series
    "signal": pred_score,
}

EXECUTOR_CONFIG = {
    "time_per_step": "day",
    "generate_portfolio_metrics": True,
}

backtest_config = {
    "start_time": "2017-01-01",
    "end_time": "2020-08-01",
    "account": 100000000,
    "benchmark": CSI300_BENCH,
    "exchange_kwargs": {
        "freq": FREQ,
        "limit_threshold": 0.095,
        "deal_price": "close",
        "open_cost": 0.0005,
        "close_cost": 0.0015,
        "min_cost": 5,
    },
}

# strategy object
strategy_obj = TopkDropoutStrategy(**STRATEGY_CONFIG)
# executor object
executor_obj = executor.SimulatorExecutor(**EXECUTOR_CONFIG)
# backtest
portfolio_metric_dict, indicator_dict = backtest(executor=executor_obj, strategy=strategy_obj, **backtest_config)
analysis_freq = "{0}{1}".format(*Freq.parse(FREQ))
# backtest info
report_normal_df, positions_normal = portfolio_metric_dict.get(analysis_freq)
analysis = dict()
analysis["excess_return_without_cost"] = risk_analysis(
    report_normal_df["return"] - report_normal_df["bench"], freq=analysis_freq
)
analysis["excess_return_with_cost"] = risk_analysis(
    report_normal_df["return"] - report_normal_df["bench"] - report_normal_df["cost"], freq=analysis_freq
)

analysis_df = pd.concat(analysis)  # type: pd.DataFrame
analysis_position.risk_analysis_graph(analysis_df, report_normal_df)
Parameters:
  • analysis_df

    analysis data, index is pd.MultiIndex; columns names is [risk].

                                                      risk
    excess_return_without_cost mean               0.000692
                               std                0.005374
                               annualized_return  0.174495
                               information_ratio  2.045576
                               max_drawdown      -0.079103
    excess_return_with_cost    mean               0.000499
                               std                0.005372
                               annualized_return  0.125625
                               information_ratio  1.473152
                               max_drawdown      -0.088263
    
  • report_normal_df

    df.index.name must be date, df.columns must contain return, turnover, cost, bench.

                return      cost        bench       turnover
    date
    2017-01-04  0.003421    0.000864    0.011693    0.576325
    2017-01-05  0.000508    0.000447    0.000721    0.227882
    2017-01-06  -0.003321   0.000212    -0.004322   0.102765
    2017-01-09  0.006753    0.000212    0.006874    0.105864
    2017-01-10  -0.000416   0.000440    -0.003350   0.208396
    
  • report_long_short_df

    df.index.name must be date, df.columns contain long, short, long_short.

                long        short       long_short
    date
    2017-01-04  -0.001360   0.001394    0.000034
    2017-01-05  0.002456    0.000058    0.002514
    2017-01-06  0.000120    0.002739    0.002859
    2017-01-09  0.001436    0.001838    0.003273
    2017-01-10  0.000824    -0.001944   -0.001120
    
  • show_notebook – Whether to display graphics in a notebook, default True. If True, show graph in notebook If False, return graph figure
Returns:

qlib.contrib.report.analysis_position.rank_label.rank_label_graph(position: dict, label_data: pandas.core.frame.DataFrame, start_date=None, end_date=None, show_notebook=True) → Iterable[plotly.graph_objs._figure.Figure]

Ranking percentage of stocks buy, sell, and holding on the trading day. Average rank-ratio(similar to sell_df[‘label’].rank(ascending=False) / len(sell_df)) of daily trading

Example:

from qlib.data import D
from qlib.contrib.evaluate import backtest
from qlib.contrib.strategy import TopkDropoutStrategy

# backtest parameters
bparas = {}
bparas['limit_threshold'] = 0.095
bparas['account'] = 1000000000

sparas = {}
sparas['topk'] = 50
sparas['n_drop'] = 230
strategy = TopkDropoutStrategy(**sparas)

_, positions = backtest(pred_df, strategy, **bparas)

pred_df_dates = pred_df.index.get_level_values(level='datetime')
features_df = D.features(D.instruments('csi500'), ['Ref($close, -1)/$close-1'], pred_df_dates.min(), pred_df_dates.max())
features_df.columns = ['label']

qcr.analysis_position.rank_label_graph(positions, features_df, pred_df_dates.min(), pred_df_dates.max())
Parameters:
  • position – position data; qlib.backtest.backtest result.
  • label_data

    D.features result; index is pd.MultiIndex, index name is [instrument, datetime]; columns names is [label].

    The label T is the change from T to T+1, it is recommended to use close, example: D.features(D.instruments(‘csi500’), [‘Ref($close, -1)/$close-1’]).

                                    label
    instrument  datetime
    SH600004        2017-12-11  -0.013502
                    2017-12-12  -0.072367
                    2017-12-13  -0.068605
                    2017-12-14  0.012440
                    2017-12-15  -0.102778
    
  • start_date – start date
  • end_date – end_date
  • show_notebookTrue or False. If True, show graph in notebook, else return figures.
Returns:

qlib.contrib.report.analysis_model.analysis_model_performance.ic_figure(ic_df: pandas.core.frame.DataFrame, show_nature_day=True, **kwargs) → plotly.graph_objs._figure.Figure

IC figure

Parameters:
Returns:

plotly.graph_objs.Figure

qlib.contrib.report.analysis_model.analysis_model_performance.model_performance_graph(pred_label: pandas.core.frame.DataFrame, lag: int = 1, N: int = 5, reverse=False, rank=False, graph_names: list = ['group_return', 'pred_ic', 'pred_autocorr'], show_notebook: bool = True, show_nature_day: bool = False, **kwargs) → [<class 'list'>, <class 'tuple'>]

Model performance

Parameters:
  • pred_label

    index is pd.MultiIndex, index name is [instrument, datetime]; columns names is [score, label]. It is usually same as the label of model training(e.g. “Ref($close, -2)/Ref($close, -1) - 1”).

    instrument  datetime        score       label
    SH600004    2017-12-11  -0.013502       -0.013502
                    2017-12-12  -0.072367       -0.072367
                    2017-12-13  -0.068605       -0.068605
                    2017-12-14  0.012440        0.012440
                    2017-12-15  -0.102778       -0.102778
    
  • lagpred.groupby(level=’instrument’)[‘score’].shift(lag). It will be only used in the auto-correlation computing.
  • N – group number, default 5.
  • reverse – if True, pred[‘score’] *= -1.
  • rank – if True, calculate rank ic.
  • graph_names – graph names; default [‘cumulative_return’, ‘pred_ic’, ‘pred_autocorr’, ‘pred_turnover’].
  • show_notebook – whether to display graphics in notebook, the default is True.
  • show_nature_day – whether to display the abscissa of non-trading day.
  • **kwargs – contains some parameters to control plot style in plotly. Currently, supports - rangebreaks: https://plotly.com/python/time-series/#Hiding-Weekends-and-Holidays
Returns:

if show_notebook is True, display in notebook; else return plotly.graph_objs.Figure list.

Workflow

Experiment Manager

class qlib.workflow.expm.ExpManager(uri: str, default_exp_name: Optional[str])

This is the ExpManager class for managing experiments. The API is designed similar to mlflow. (The link: https://mlflow.org/docs/latest/python_api/mlflow.html)

The ExpManager is expected to be a singleton (btw, we can have multiple Experiment`s with different uri. user can get different experiments from different uri, and then compare records of them). Global Config (i.e. `C) is also a singleton.

So we try to align them together. They share the same variable, which is called default uri. Please refer to ExpManager.default_uri for details of variable sharing.

When the user starts an experiment, the user may want to set the uri to a specific uri (it will override default uri during this period), and then unset the specific uri and fallback to the default uri. ExpManager._active_exp_uri is that specific uri.

__init__(uri: str, default_exp_name: Optional[str])

Initialize self. See help(type(self)) for accurate signature.

start_exp(*, experiment_id: Optional[str] = None, experiment_name: Optional[str] = None, recorder_id: Optional[str] = None, recorder_name: Optional[str] = None, uri: Optional[str] = None, resume: bool = False, **kwargs) → qlib.workflow.exp.Experiment

Start an experiment. This method includes first get_or_create an experiment, and then set it to be active.

Maintaining _active_exp_uri is included in start_exp, remaining implementation should be included in _end_exp in subclass

Parameters:
  • experiment_id (str) – id of the active experiment.
  • experiment_name (str) – name of the active experiment.
  • recorder_id (str) – id of the recorder to be started.
  • recorder_name (str) – name of the recorder to be started.
  • uri (str) – the current tracking URI.
  • resume (boolean) – whether to resume the experiment and recorder.
Returns:

Return type:

An active experiment.

end_exp(recorder_status: str = 'SCHEDULED', **kwargs)

End an active experiment.

Maintaining _active_exp_uri is included in end_exp, remaining implementation should be included in _end_exp in subclass

Parameters:
  • experiment_name (str) – name of the active experiment.
  • recorder_status (str) – the status of the active recorder of the experiment.
create_exp(experiment_name: Optional[str] = None)

Create an experiment.

Parameters:experiment_name (str) – the experiment name, which must be unique.
Returns:
  • An experiment object.
  • Raise
  • —–
  • ExpAlreadyExistError
search_records(experiment_ids=None, **kwargs)

Get a pandas DataFrame of records that fit the search criteria of the experiment. Inputs are the search criteria user want to apply.

Returns:
  • A pandas.DataFrame of records, where each metric, parameter, and tag
  • are expanded into their own columns named metrics., params.*, and tags.**
  • respectively. For records that don’t have a particular metric, parameter, or tag, their
  • value will be (NumPy) Nan, None, or None respectively.
get_exp(*, experiment_id=None, experiment_name=None, create: bool = True, start: bool = False)

Retrieve an experiment. This method includes getting an active experiment, and get_or_create a specific experiment.

When user specify experiment id and name, the method will try to return the specific experiment. When user does not provide recorder id or name, the method will try to return the current active experiment. The create argument determines whether the method will automatically create a new experiment according to user’s specification if the experiment hasn’t been created before.

  • If create is True:

    • If active experiment exists:

      • no id or name specified, return the active experiment.
      • if id or name is specified, return the specified experiment. If no such exp found, create a new experiment with given id or name. If start is set to be True, the experiment is set to be active.
    • If active experiment not exists:

      • no id or name specified, create a default experiment.
      • if id or name is specified, return the specified experiment. If no such exp found, create a new experiment with given id or name. If start is set to be True, the experiment is set to be active.
  • Else If create is False:

    • If active experiment exists:

      • no id or name specified, return the active experiment.
      • if id or name is specified, return the specified experiment. If no such exp found, raise Error.
    • If active experiment not exists:

      • no id or name specified. If the default experiment exists, return it, otherwise, raise Error.
      • if id or name is specified, return the specified experiment. If no such exp found, raise Error.
Parameters:
  • experiment_id (str) – id of the experiment to return.
  • experiment_name (str) – name of the experiment to return.
  • create (boolean) – create the experiment it if hasn’t been created before.
  • start (boolean) – start the new experiment if one is created.
Returns:

Return type:

An experiment object.

delete_exp(experiment_id=None, experiment_name=None)

Delete an experiment.

Parameters:
  • experiment_id (str) – the experiment id.
  • experiment_name (str) – the experiment name.
default_uri

Get the default tracking URI from qlib.config.C

uri

Get the default tracking URI or current URI.

Returns:
Return type:The tracking URI string.
list_experiments()

List all the existing experiments.

Returns:
Return type:A dictionary (name -> experiment) of experiments information that being stored.

Experiment

class qlib.workflow.exp.Experiment(id, name)

This is the Experiment class for each experiment being run. The API is designed similar to mlflow. (The link: https://mlflow.org/docs/latest/python_api/mlflow.html)

__init__(id, name)

Initialize self. See help(type(self)) for accurate signature.

start(*, recorder_id=None, recorder_name=None, resume=False)

Start the experiment and set it to be active. This method will also start a new recorder.

Parameters:
  • recorder_id (str) – the id of the recorder to be created.
  • recorder_name (str) – the name of the recorder to be created.
  • resume (bool) – whether to resume the first recorder
Returns:

Return type:

An active recorder.

end(recorder_status='SCHEDULED')

End the experiment.

Parameters:recorder_status (str) – the status the recorder to be set with when ending (SCHEDULED, RUNNING, FINISHED, FAILED).
create_recorder(recorder_name=None)

Create a recorder for each experiment.

Parameters:recorder_name (str) – the name of the recorder to be created.
Returns:
Return type:A recorder object.
search_records(**kwargs)

Get a pandas DataFrame of records that fit the search criteria of the experiment. Inputs are the search criteria user want to apply.

Returns:
  • A pandas.DataFrame of records, where each metric, parameter, and tag
  • are expanded into their own columns named metrics., params.*, and tags.**
  • respectively. For records that don’t have a particular metric, parameter, or tag, their
  • value will be (NumPy) Nan, None, or None respectively.
delete_recorder(recorder_id)

Create a recorder for each experiment.

Parameters:recorder_id (str) – the id of the recorder to be deleted.
get_recorder(recorder_id=None, recorder_name=None, create: bool = True, start: bool = False) → qlib.workflow.recorder.Recorder

Retrieve a Recorder for user. When user specify recorder id and name, the method will try to return the specific recorder. When user does not provide recorder id or name, the method will try to return the current active recorder. The create argument determines whether the method will automatically create a new recorder according to user’s specification if the recorder hasn’t been created before.

  • If create is True:

    • If active recorder exists:

      • no id or name specified, return the active recorder.
      • if id or name is specified, return the specified recorder. If no such exp found, create a new recorder with given id or name. If start is set to be True, the recorder is set to be active.
    • If active recorder not exists:

      • no id or name specified, create a new recorder.
      • if id or name is specified, return the specified experiment. If no such exp found, create a new recorder with given id or name. If start is set to be True, the recorder is set to be active.
  • Else If create is False:

    • If active recorder exists:

      • no id or name specified, return the active recorder.
      • if id or name is specified, return the specified recorder. If no such exp found, raise Error.
    • If active recorder not exists:

      • no id or name specified, raise Error.
      • if id or name is specified, return the specified recorder. If no such exp found, raise Error.
Parameters:
  • recorder_id (str) – the id of the recorder to be deleted.
  • recorder_name (str) – the name of the recorder to be deleted.
  • create (boolean) – create the recorder if it hasn’t been created before.
  • start (boolean) – start the new recorder if one is created.
Returns:

Return type:

A recorder object.

list_recorders(rtype: typing_extensions.Literal['dict', 'list'][dict, list] = 'dict', **flt_kwargs) → Union[List[qlib.workflow.recorder.Recorder], Dict[str, qlib.workflow.recorder.Recorder]]

List all the existing recorders of this experiment. Please first get the experiment instance before calling this method. If user want to use the method R.list_recorders(), please refer to the related API document in QlibRecorder.

flt_kwargs : dict
filter recorders by conditions e.g. list_recorders(status=Recorder.STATUS_FI)
Returns:
if rtype == “dict”:
A dictionary (id -> recorder) of recorder information that being stored.
elif rtype == “list”:
A list of Recorder.
Return type:The return type depends on rtype

Recorder

class qlib.workflow.recorder.Recorder(experiment_id, name)

This is the Recorder class for logging the experiments. The API is designed similar to mlflow. (The link: https://mlflow.org/docs/latest/python_api/mlflow.html)

The status of the recorder can be SCHEDULED, RUNNING, FINISHED, FAILED.

__init__(experiment_id, name)

Initialize self. See help(type(self)) for accurate signature.

save_objects(local_path=None, artifact_path=None, **kwargs)

Save objects such as prediction file or model checkpoints to the artifact URI. User can save object through keywords arguments (name:value).

Please refer to the docs of qlib.workflow:R.save_objects

Parameters:
  • local_path (str) – if provided, them save the file or directory to the artifact URI.
  • artifact_path=None (str) – the relative path for the artifact to be stored in the URI.
load_object(name)

Load objects such as prediction file or model checkpoints.

Parameters:name (str) – name of the file to be loaded.
Returns:
Return type:The saved object.
start_run()

Start running or resuming the Recorder. The return value can be used as a context manager within a with block; otherwise, you must call end_run() to terminate the current run. (See ActiveRun class in mlflow)

Returns:
Return type:An active running object (e.g. mlflow.ActiveRun object)
end_run()

End an active Recorder.

log_params(**kwargs)

Log a batch of params for the current run.

Parameters:arguments (keyword) – key, value pair to be logged as parameters.
log_metrics(step=None, **kwargs)

Log multiple metrics for the current run.

Parameters:arguments (keyword) – key, value pair to be logged as metrics.
log_artifact(local_path: str, artifact_path: Optional[str] = None)

Log a local file or directory as an artifact of the currently active run.

Parameters:
  • local_path (str) – Path to the file to write.
  • artifact_path (Optional[str]) – If provided, the directory in artifact_uri to write to.
set_tags(**kwargs)

Log a batch of tags for the current run.

Parameters:arguments (keyword) – key, value pair to be logged as tags.
delete_tags(*keys)

Delete some tags from a run.

Parameters:keys (series of strs of the keys) – all the name of the tag to be deleted.
list_artifacts(artifact_path: str = None)

List all the artifacts of a recorder.

Parameters:artifact_path (str) – the relative path for the artifact to be stored in the URI.
Returns:
Return type:A list of artifacts information (name, path, etc.) that being stored.
download_artifact(path: str, dst_path: Optional[str] = None) → str

Download an artifact file or directory from a run to a local directory if applicable, and return a local path for it.

Parameters:
  • path (str) – Relative source path to the desired artifact.
  • dst_path (Optional[str]) – Absolute path of the local filesystem destination directory to which to download the specified artifacts. This directory must already exist. If unspecified, the artifacts will either be downloaded to a new uniquely-named directory on the local filesystem.
Returns:

Local path of desired artifact.

Return type:

str

list_metrics()

List all the metrics of a recorder.

Returns:
Return type:A dictionary of metrics that being stored.
list_params()

List all the params of a recorder.

Returns:
Return type:A dictionary of params that being stored.
list_tags()

List all the tags of a recorder.

Returns:
Return type:A dictionary of tags that being stored.

Record Template

class qlib.workflow.record_temp.RecordTemp(recorder)

This is the Records Template class that enables user to generate experiment results such as IC and backtest in a certain format.

save(**kwargs)

It behaves the same as self.recorder.save_objects. But it is an easier interface because users don’t have to care about get_path and artifact_path

__init__(recorder)

Initialize self. See help(type(self)) for accurate signature.

generate(**kwargs)

Generate certain records such as IC, backtest etc., and save them.

Parameters:kwargs
load(name: str, parents: bool = True)

It behaves the same as self.recorder.load_object. But it is an easier interface because users don’t have to care about get_path and artifact_path

Parameters:
  • name (str) – the name for the file to be load.
  • parents (bool) – Each recorder has different artifact_path. So parents recursively find the path in parents Sub classes has higher priority
Returns:

Return type:

The stored records.

list()

List the supported artifacts. Users don’t have to consider self.get_path

Returns:
Return type:A list of all the supported artifacts.
check(include_self: bool = False, parents: bool = True)

Check if the records is properly generated and saved. It is useful in following examples

  • checking if the dependant files complete before generating new things.
  • checking if the final files is completed
Parameters:
  • include_self (bool) – is the file generated by self included
  • parents (bool) – will we check parents
  • Raise
  • ------
  • FileNotFoundError – whether the records are stored properly.
class qlib.workflow.record_temp.SignalRecord(model=None, dataset=None, recorder=None)

This is the Signal Record class that generates the signal prediction. This class inherits the RecordTemp class.

__init__(model=None, dataset=None, recorder=None)

Initialize self. See help(type(self)) for accurate signature.

generate(**kwargs)

Generate certain records such as IC, backtest etc., and save them.

Parameters:kwargs
list()

List the supported artifacts. Users don’t have to consider self.get_path

Returns:
Return type:A list of all the supported artifacts.
class qlib.workflow.record_temp.ACRecordTemp(recorder, skip_existing=False)

Automatically checking record template

__init__(recorder, skip_existing=False)

Initialize self. See help(type(self)) for accurate signature.

generate(*args, **kwargs)

automatically checking the files and then run the concrete generating task

class qlib.workflow.record_temp.HFSignalRecord(recorder, **kwargs)

This is the Signal Analysis Record class that generates the analysis results such as IC and IR. This class inherits the RecordTemp class.

depend_cls

alias of SignalRecord

__init__(recorder, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

generate()

Generate certain records such as IC, backtest etc., and save them.

Parameters:kwargs
list()

List the supported artifacts. Users don’t have to consider self.get_path

Returns:
Return type:A list of all the supported artifacts.
class qlib.workflow.record_temp.SigAnaRecord(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)

This is the Signal Analysis Record class that generates the analysis results such as IC and IR. This class inherits the RecordTemp class.

depend_cls

alias of SignalRecord

__init__(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)

Initialize self. See help(type(self)) for accurate signature.

list()

List the supported artifacts. Users don’t have to consider self.get_path

Returns:
Return type:A list of all the supported artifacts.
class qlib.workflow.record_temp.PortAnaRecord(recorder, config=None, risk_analysis_freq: Union[List[T], str] = None, indicator_analysis_freq: Union[List[T], str] = None, indicator_analysis_method=None, skip_existing=False, **kwargs)

This is the Portfolio Analysis Record class that generates the analysis results such as those of backtest. This class inherits the RecordTemp class.

The following files will be stored in recorder

  • report_normal.pkl & positions_normal.pkl:

    • The return report and detailed positions of the backtest, returned by qlib/contrib/evaluate.py:backtest
  • port_analysis.pkl : The risk analysis of your portfolio, returned by qlib/contrib/evaluate.py:risk_analysis

depend_cls

alias of SignalRecord

__init__(recorder, config=None, risk_analysis_freq: Union[List[T], str] = None, indicator_analysis_freq: Union[List[T], str] = None, indicator_analysis_method=None, skip_existing=False, **kwargs)
config[“strategy”] : dict
define the strategy class as well as the kwargs.
config[“executor”] : dict
define the executor class as well as the kwargs.
config[“backtest”] : dict
define the backtest kwargs.
risk_analysis_freq : str|List[str]
risk analysis freq of report
indicator_analysis_freq : str|List[str]
indicator analysis freq of report
indicator_analysis_method : str, optional, default by None
the candidate values include ‘mean’, ‘amount_weighted’, ‘value_weighted’
list()

List the supported artifacts. Users don’t have to consider self.get_path

Returns:
Return type:A list of all the supported artifacts.
class qlib.workflow.record_temp.MultiPassPortAnaRecord(recorder, pass_num=10, shuffle_init_score=True, **kwargs)

This is the Multiple Pass Portfolio Analysis Record class that run backtest multiple times and generates the analysis results such as those of backtest. This class inherits the PortAnaRecord class.

If shuffle_init_score enabled, the prediction score of the first backtest date will be shuffled, so that initial position will be random. The shuffle_init_score will only works when the signal is used as <PRED> placeholder. The placeholder will be replaced by pred.pkl saved in recorder.

Parameters:
  • recorder (Recorder) – The recorder used to save the backtest results.
  • pass_num (int) – The number of backtest passes.
  • shuffle_init_score (bool) – Whether to shuffle the prediction score of the first backtest date.
depend_cls

alias of SignalRecord

__init__(recorder, pass_num=10, shuffle_init_score=True, **kwargs)
Parameters:
  • recorder (Recorder) – The recorder used to save the backtest results.
  • pass_num (int) – The number of backtest passes.
  • shuffle_init_score (bool) – Whether to shuffle the prediction score of the first backtest date.
list()

List the supported artifacts. Users don’t have to consider self.get_path

Returns:
Return type:A list of all the supported artifacts.

Task Management

TaskGen

TaskGenerator module can generate many tasks based on TaskGen and some task templates.

qlib.workflow.task.gen.task_generator(tasks, generators) → list

Use a list of TaskGen and a list of task templates to generate different tasks.

For examples:

There are 3 task templates a,b,c and 2 TaskGen A,B. A will generates 2 tasks from a template and B will generates 3 tasks from a template. task_generator([a, b, c], [A, B]) will finally generate 3*2*3 = 18 tasks.
Parameters:
  • tasks (List[dict] or dict) – a list of task templates or a single task
  • generators (List[TaskGen] or TaskGen) – a list of TaskGen or a single TaskGen
Returns:

a list of tasks

Return type:

list

class qlib.workflow.task.gen.TaskGen

The base class for generating different tasks

Example 1:

input: a specific task template and rolling steps

output: rolling version of the tasks

Example 2:

input: a specific task template and losses list

output: a set of tasks with different losses

generate(task: dict) → List[dict]

Generate different tasks based on a task template

Parameters:task (dict) – a task template
Returns:A list of tasks
Return type:typing.List[dict]
qlib.workflow.task.gen.handler_mod(task: dict, rolling_gen)

Help to modify the handler end time when using RollingGen It try to handle the following case

  • Hander’s data end_time is earlier than dataset’s test_data’s segments.

    • To handle this, handler’s data’s end_time is extended.

If the handler’s end_time is None, then it is not necessary to change it’s end time.

Parameters:
  • task (dict) – a task template
  • rg (RollingGen) – an instance of RollingGen
qlib.workflow.task.gen.trunc_segments(ta: qlib.workflow.task.utils.TimeAdjuster, segments: Dict[str, pandas._libs.tslibs.timestamps.Timestamp], days, test_key='test')

To avoid the leakage of future information, the segments should be truncated according to the test start_time

Note

This function will change segments inplace

class qlib.workflow.task.gen.RollingGen(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: Union[None, Callable] = <function handler_mod>, test_key='test', train_key='train', trunc_days: int = None, task_copy_func: Callable = <function deepcopy>)
__init__(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: Union[None, Callable] = <function handler_mod>, test_key='test', train_key='train', trunc_days: int = None, task_copy_func: Callable = <function deepcopy>)

Generate tasks for rolling

Parameters:
  • step (int) – step to rolling
  • rtype (str) – rolling type (expanding, sliding)
  • ds_extra_mod_func (Callable) – A method like: handler_mod(task: dict, rg: RollingGen) Do some extra action after generating a task. For example, use handler_mod to modify the end time of the handler of a dataset.
  • trunc_days (int) – trunc some data to avoid future information leakage
  • task_copy_func (Callable) – the function to copy entire task. This is very useful when user want to share something between tasks
gen_following_tasks(task: dict, test_end: pandas._libs.tslibs.timestamps.Timestamp) → List[dict]

generating following rolling tasks for task until test_end

Parameters:
  • task (dict) – Qlib task format
  • test_end (pd.Timestamp) – the latest rolling task includes test_end
Returns:

the following tasks of task`(`task itself is excluded)

Return type:

List[dict]

generate(task: dict) → List[dict]

Converting the task into a rolling task.

Parameters:task (dict) –

A dict describing a task. For example.

DEFAULT_TASK = {
    "model": {
        "class": "LGBModel",
        "module_path": "qlib.contrib.model.gbdt",
    },
    "dataset": {
        "class": "DatasetH",
        "module_path": "qlib.data.dataset",
        "kwargs": {
            "handler": {
                "class": "Alpha158",
                "module_path": "qlib.contrib.data.handler",
                "kwargs": {
                    "start_time": "2008-01-01",
                    "end_time": "2020-08-01",
                    "fit_start_time": "2008-01-01",
                    "fit_end_time": "2014-12-31",
                    "instruments": "csi100",
                },
            },
            "segments": {
                "train": ("2008-01-01", "2014-12-31"),
                "valid": ("2015-01-01", "2016-12-20"),  # Please avoid leaking the future test data into validation
                "test": ("2017-01-01", "2020-08-01"),
            },
        },
    },
    "record": [
        {
            "class": "SignalRecord",
            "module_path": "qlib.workflow.record_temp",
        },
    ]
}
Returns:List[dict]
Return type:a list of tasks
class qlib.workflow.task.gen.MultiHorizonGenBase(horizon: List[int] = [5], label_leak_n=2)
__init__(horizon: List[int] = [5], label_leak_n=2)

This task generator tries to generate tasks for different horizons based on an existing task

Parameters:
  • horizon (List[int]) – the possible horizons of the tasks
  • label_leak_n (int) – How many future days it will take to get complete label after the day making prediction For example: - User make prediction on day T`(after getting the close price on `T) - The label is the return of buying stock on T + 1 and selling it on T + 2 - the label_leak_n will be 2 (e.g. two days of information is leaked to leverage this sample)
set_horizon(task: dict, hr: int)

This method is designed to change the task in place

Parameters:
  • task (dict) – Qlib’s task
  • hr (int) – the horizon of task
generate(task: dict)

Generate different tasks based on a task template

Parameters:task (dict) – a task template
Returns:A list of tasks
Return type:typing.List[dict]

TaskManager

TaskManager can fetch unused tasks automatically and manage the lifecycle of a set of tasks with error handling. These features can run tasks concurrently and ensure every task will be used only once. Task Manager will store all tasks in MongoDB. Users MUST finished the configuration of MongoDB when using this module.

A task in TaskManager consists of 3 parts - tasks description: the desc will define the task - tasks status: the status of the task - tasks result: A user can get the task with the task description and task result.

class qlib.workflow.task.manage.TaskManager(task_pool: str)

Here is what will a task looks like when it created by TaskManager

{
    'def': pickle serialized task definition.  using pickle will make it easier
    'filter': json-like data. This is for filtering the tasks.
    'status': 'waiting' | 'running' | 'done'
    'res': pickle serialized task result,
}

The tasks manager assumes that you will only update the tasks you fetched. The mongo fetch one and update will make it date updating secure.

This class can be used as a tool from commandline. Here are several examples. You can view the help of manage module with the following commands: python -m qlib.workflow.task.manage -h # show manual of manage module CLI python -m qlib.workflow.task.manage wait -h # show manual of the wait command of manage

python -m qlib.workflow.task.manage -t <pool_name> wait
python -m qlib.workflow.task.manage -t <pool_name> task_stat

Note

Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded

Here are four status which are:

STATUS_WAITING: waiting for training

STATUS_RUNNING: training

STATUS_PART_DONE: finished some step and waiting for next step

STATUS_DONE: all work done

__init__(task_pool: str)

Init Task Manager, remember to make the statement of MongoDB url and database name firstly. A TaskManager instance serves a specific task pool. The static method of this module serves the whole MongoDB.

Parameters:task_pool (str) – the name of Collection in MongoDB
static list() → list

List the all collection(task_pool) of the db.

Returns:list
replace_task(task, new_task)

Use a new task to replace a old one

Parameters:
  • task – old task
  • new_task – new task
insert_task(task)

Insert a task.

Parameters:task – the task waiting for insert
Returns:pymongo.results.InsertOneResult
insert_task_def(task_def)

Insert a task to task_pool

Parameters:task_def (dict) – the task definition
Returns:
Return type:pymongo.results.InsertOneResult
create_task(task_def_l, dry_run=False, print_nt=False) → List[str]

If the tasks in task_def_l are new, then insert new tasks into the task_pool, and record inserted_id. If a task is not new, then just query its _id.

Parameters:
  • task_def_l (list) – a list of task
  • dry_run (bool) – if insert those new tasks to task pool
  • print_nt (bool) – if print new task
Returns:

a list of the _id of task_def_l

Return type:

List[str]

fetch_task(query={}, status='waiting') → dict

Use query to fetch tasks.

Parameters:
  • query (dict, optional) – query dict. Defaults to {}.
  • status (str, optional) – [description]. Defaults to STATUS_WAITING.
Returns:

a task(document in collection) after decoding

Return type:

dict

safe_fetch_task(query={}, status='waiting')

Fetch task from task_pool using query with contextmanager

Parameters:query (dict) – the dict of query
Returns:dict
Return type:a task(document in collection) after decoding
query(query={}, decode=True)

Query task in collection. This function may raise exception pymongo.errors.CursorNotFound: cursor id not found if it takes too long to iterate the generator

python -m qlib.workflow.task.manage -t <your task pool> query ‘{“_id”: “615498be837d0053acbc5d58”}’

Parameters:
  • query (dict) – the dict of query
  • decode (bool) –
Returns:

dict

Return type:

a task(document in collection) after decoding

re_query(_id) → dict

Use _id to query task.

Parameters:_id (str) – _id of a document
Returns:a task(document in collection) after decoding
Return type:dict
commit_task_res(task, res, status='done')

Commit the result to task[‘res’].

Parameters:
  • task ([type]) – [description]
  • res (object) – the result you want to save
  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_DONE.
return_task(task, status='waiting')

Return a task to status. Always using in error handling.

Parameters:
  • task ([type]) – [description]
  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_WAITING.
remove(query={})

Remove the task using query

Parameters:query (dict) – the dict of query
task_stat(query={}) → dict

Count the tasks in every status.

Parameters:query (dict, optional) – the query dict. Defaults to {}.
Returns:dict
reset_waiting(query={})

Reset all running task into waiting status. Can be used when some running task exit unexpected.

Parameters:query (dict, optional) – the query dict. Defaults to {}.
prioritize(task, priority: int)

Set priority for task

Parameters:
  • task (dict) – The task query from the database
  • priority (int) – the target priority
wait(query={})

When multiprocessing, the main progress may fetch nothing from TaskManager because there are still some running tasks. So main progress should wait until all tasks are trained well by other progress or machines.

Parameters:query (dict, optional) – the query dict. Defaults to {}.
qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)

While the task pool is not empty (has WAITING tasks), use task_func to fetch and run tasks in task_pool

After running this method, here are 4 situations (before_status -> after_status):

STATUS_WAITING -> STATUS_DONE: use task[“def”] as task_func param, it means that the task has not been started

STATUS_WAITING -> STATUS_PART_DONE: use task[“def”] as task_func param

STATUS_PART_DONE -> STATUS_PART_DONE: use task[“res”] as task_func param, it means that the task has been started but not completed

STATUS_PART_DONE -> STATUS_DONE: use task[“res”] as task_func param

Parameters:
  • task_func (Callable) –

    def (task_def, **kwargs) -> <res which will be committed>

    the function to run the task

  • task_pool (str) – the name of the task pool (Collection in MongoDB)
  • query (dict) – will use this dict to query task_pool when fetching task
  • force_release (bool) – will the program force to release the resource
  • before_status (str:) – the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.
  • after_status (str:) – the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.
  • kwargs – the params for task_func

Trainer

The Trainer will train a list of tasks and return a list of model recorders. There are two steps in each Trainer including train (make model recorder) and end_train (modify model recorder).

This is a concept called DelayTrainer, which can be used in online simulating for parallel training. In DelayTrainer, the first step is only to save some necessary info to model recorders, and the second step which will be finished in the end can do some concurrent and time-consuming operations such as model fitting.

Qlib offer two kinds of Trainer, TrainerR is the simplest way and TrainerRM is based on TaskManager to help manager tasks lifecycle automatically.

qlib.model.trainer.begin_task_train(task_config: dict, experiment_name: str, recorder_name: str = None) → qlib.workflow.recorder.Recorder

Begin task training to start a recorder and save the task config.

Parameters:
  • task_config (dict) – the config of a task
  • experiment_name (str) – the name of experiment
  • recorder_name (str) – the given name will be the recorder name. None for using rid.
Returns:

the model recorder

Return type:

Recorder

qlib.model.trainer.end_task_train(rec: qlib.workflow.recorder.Recorder, experiment_name: str) → qlib.workflow.recorder.Recorder

Finish task training with real model fitting and saving.

Parameters:
  • rec (Recorder) – the recorder will be resumed
  • experiment_name (str) – the name of experiment
Returns:

the model recorder

Return type:

Recorder

qlib.model.trainer.task_train(task_config: dict, experiment_name: str, recorder_name: str = None) → qlib.workflow.recorder.Recorder

Task based training, will be divided into two steps.

Parameters:
  • task_config (dict) – The config of a task.
  • experiment_name (str) – The name of experiment
  • recorder_name (str) – The name of recorder
Returns:

Recorder

Return type:

The instance of the recorder

class qlib.model.trainer.Trainer

The trainer can train a list of models. There are Trainer and DelayTrainer, which can be distinguished by when it will finish real training.

__init__()

Initialize self. See help(type(self)) for accurate signature.

train(tasks: list, *args, **kwargs) → list

Given a list of task definitions, begin training, and return the models.

For Trainer, it finishes real training in this method. For DelayTrainer, it only does some preparation in this method.

Parameters:tasks – a list of tasks
Returns:a list of models
Return type:list
end_train(models: list, *args, **kwargs) → list

Given a list of models, finished something at the end of training if you need. The models may be Recorder, txt file, database, and so on.

For Trainer, it does some finishing touches in this method. For DelayTrainer, it finishes real training in this method.

Parameters:models – a list of models
Returns:a list of models
Return type:list
is_delay() → bool

If Trainer will delay finishing end_train.

Returns:if DelayTrainer
Return type:bool
has_worker() → bool

Some trainer has backend worker to support parallel training This method can tell if the worker is enabled.

Returns:if the worker is enabled
Return type:bool
worker()

start the worker

Raises:NotImplementedError: – If the worker is not supported
class qlib.model.trainer.TrainerR(experiment_name: Optional[str] = None, train_func: Callable = <function task_train>, call_in_subproc: bool = False, default_rec_name: Optional[str] = None)

Trainer based on (R)ecorder. It will train a list of tasks and return a list of model recorders in a linear way.

Assumption: models were defined by task and the results will be saved to Recorder.

__init__(experiment_name: Optional[str] = None, train_func: Callable = <function task_train>, call_in_subproc: bool = False, default_rec_name: Optional[str] = None)

Init TrainerR.

Parameters:
  • experiment_name (str, optional) – the default name of experiment.
  • train_func (Callable, optional) – default training method. Defaults to task_train.
  • call_in_subproc (bool) – call the process in subprocess to force memory release
train(tasks: list, train_func: Callable = None, experiment_name: str = None, **kwargs) → List[qlib.workflow.recorder.Recorder]

Given a list of tasks and return a list of trained Recorder. The order can be guaranteed.

Parameters:
  • tasks (list) – a list of definitions based on task dict
  • train_func (Callable) – the training method which needs at least tasks and experiment_name. None for the default training method.
  • experiment_name (str) – the experiment name, None for use default name.
  • kwargs – the params for train_func.
Returns:

a list of Recorders

Return type:

List[Recorder]

end_train(models: list, **kwargs) → List[qlib.workflow.recorder.Recorder]

Set STATUS_END tag to the recorders.

Parameters:models (list) – a list of trained recorders.
Returns:the same list as the param.
Return type:List[Recorder]
class qlib.model.trainer.DelayTrainerR(experiment_name: str = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)

A delayed implementation based on TrainerR, which means train method may only do some preparation and end_train method can do the real model fitting.

__init__(experiment_name: str = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)

Init TrainerRM.

Parameters:
  • experiment_name (str) – the default name of experiment.
  • train_func (Callable, optional) – default train method. Defaults to begin_task_train.
  • end_train_func (Callable, optional) – default end_train method. Defaults to end_task_train.
end_train(models, end_train_func=None, experiment_name: str = None, **kwargs) → List[qlib.workflow.recorder.Recorder]

Given a list of Recorder and return a list of trained Recorder. This class will finish real data loading and model fitting.

Parameters:
  • models (list) – a list of Recorder, the tasks have been saved to them
  • end_train_func (Callable, optional) – the end_train method which needs at least recorders and experiment_name. Defaults to None for using self.end_train_func.
  • experiment_name (str) – the experiment name, None for use default name.
  • kwargs – the params for end_train_func.
Returns:

a list of Recorders

Return type:

List[Recorder]

class qlib.model.trainer.TrainerRM(experiment_name: str = None, task_pool: str = None, train_func=<function task_train>, skip_run_task: bool = False, default_rec_name: Optional[str] = None)

Trainer based on (R)ecorder and Task(M)anager. It can train a list of tasks and return a list of model recorders in a multiprocessing way.

Assumption: task will be saved to TaskManager and task will be fetched and trained from TaskManager

__init__(experiment_name: str = None, task_pool: str = None, train_func=<function task_train>, skip_run_task: bool = False, default_rec_name: Optional[str] = None)

Init TrainerR.

Parameters:
  • experiment_name (str) – the default name of experiment.
  • task_pool (str) – task pool name in TaskManager. None for use same name as experiment_name.
  • train_func (Callable, optional) – default training method. Defaults to task_train.
  • skip_run_task (bool) – If skip_run_task == True: Only run_task in the worker. Otherwise skip run_task.
train(tasks: list, train_func: Callable = None, experiment_name: str = None, before_status: str = 'waiting', after_status: str = 'done', default_rec_name: Optional[str] = None, **kwargs) → List[qlib.workflow.recorder.Recorder]

Given a list of tasks and return a list of trained Recorder. The order can be guaranteed.

This method defaults to a single process, but TaskManager offered a great way to parallel training. Users can customize their train_func to realize multiple processes or even multiple machines.

Parameters:
  • tasks (list) – a list of definitions based on task dict
  • train_func (Callable) – the training method which needs at least tasks and experiment_name. None for the default training method.
  • experiment_name (str) – the experiment name, None for use default name.
  • before_status (str) – the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.
  • after_status (str) – the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.
  • kwargs – the params for train_func.
Returns:

a list of Recorders

Return type:

List[Recorder]

end_train(recs: list, **kwargs) → List[qlib.workflow.recorder.Recorder]

Set STATUS_END tag to the recorders.

Parameters:recs (list) – a list of trained recorders.
Returns:the same list as the param.
Return type:List[Recorder]
worker(train_func: Callable = None, experiment_name: str = None)

The multiprocessing method for train. It can share a same task_pool with train and can run in other progress or other machines.

Parameters:
  • train_func (Callable) – the training method which needs at least tasks and experiment_name. None for the default training method.
  • experiment_name (str) – the experiment name, None for use default name.
has_worker() → bool

Some trainer has backend worker to support parallel training This method can tell if the worker is enabled.

Returns:if the worker is enabled
Return type:bool
class qlib.model.trainer.DelayTrainerRM(experiment_name: str = None, task_pool: str = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, skip_run_task: bool = False, **kwargs)

A delayed implementation based on TrainerRM, which means train method may only do some preparation and end_train method can do the real model fitting.

__init__(experiment_name: str = None, task_pool: str = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, skip_run_task: bool = False, **kwargs)

Init DelayTrainerRM.

Parameters:
  • experiment_name (str) – the default name of experiment.
  • task_pool (str) – task pool name in TaskManager. None for use same name as experiment_name.
  • train_func (Callable, optional) – default train method. Defaults to begin_task_train.
  • end_train_func (Callable, optional) – default end_train method. Defaults to end_task_train.
  • skip_run_task (bool) – If skip_run_task == True: Only run_task in the worker. Otherwise skip run_task. E.g. Starting trainer on a CPU VM and then waiting tasks to be finished on GPU VMs.
train(tasks: list, train_func=None, experiment_name: str = None, **kwargs) → List[qlib.workflow.recorder.Recorder]

Same as train of TrainerRM, after_status will be STATUS_PART_DONE.

Parameters:
  • tasks (list) – a list of definition based on task dict
  • train_func (Callable) – the train method which need at least tasks and experiment_name. Defaults to None for using self.train_func.
  • experiment_name (str) – the experiment name, None for use default name.
Returns:

a list of Recorders

Return type:

List[Recorder]

end_train(recs, end_train_func=None, experiment_name: str = None, **kwargs) → List[qlib.workflow.recorder.Recorder]

Given a list of Recorder and return a list of trained Recorder. This class will finish real data loading and model fitting.

Parameters:
  • recs (list) – a list of Recorder, the tasks have been saved to them.
  • end_train_func (Callable, optional) – the end_train method which need at least recorders and experiment_name. Defaults to None for using self.end_train_func.
  • experiment_name (str) – the experiment name, None for use default name.
  • kwargs – the params for end_train_func.
Returns:

a list of Recorders

Return type:

List[Recorder]

worker(end_train_func=None, experiment_name: str = None)

The multiprocessing method for end_train. It can share a same task_pool with end_train and can run in other progress or other machines.

Parameters:
  • end_train_func (Callable, optional) – the end_train method which need at least recorders and experiment_name. Defaults to None for using self.end_train_func.
  • experiment_name (str) – the experiment name, None for use default name.
has_worker() → bool

Some trainer has backend worker to support parallel training This method can tell if the worker is enabled.

Returns:if the worker is enabled
Return type:bool

Collector

Collector module can collect objects from everywhere and process them such as merging, grouping, averaging and so on.

class qlib.workflow.task.collect.Collector(process_list=[])

The collector to collect different results

__init__(process_list=[])

Init Collector.

Parameters:process_list (list or Callable) – the list of processors or the instance of a processor to process dict.
collect() → dict

Collect the results and return a dict like {key: things}

Returns:the dict after collecting.

For example:

{“prediction”: pd.Series}

{“IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}

Return type:dict
static process_collect(collected_dict, process_list=[], *args, **kwargs) → dict

Do a series of processing to the dict returned by collect and return a dict like {key: things} For example, you can group and ensemble.

Parameters:
  • collected_dict (dict) – the dict return by collect
  • process_list (list or Callable) – the list of processors or the instance of a processor to process dict. The processor order is the same as the list order. For example: [Group1(…, Ensemble1()), Group2(…, Ensemble2())]
Returns:

the dict after processing.

Return type:

dict

class qlib.workflow.task.collect.MergeCollector(collector_dict: Dict[str, qlib.workflow.task.collect.Collector], process_list: List[Callable] = [], merge_func=None)

A collector to collect the results of other Collectors

For example:

We have 2 collector, which named A and B. A can collect {“prediction”: pd.Series} and B can collect {“IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}. Then after this class’s collect, we can collect {“A_prediction”: pd.Series, “B_IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}

__init__(collector_dict: Dict[str, qlib.workflow.task.collect.Collector], process_list: List[Callable] = [], merge_func=None)

Init MergeCollector.

Parameters:
  • collector_dict (Dict[str,Collector]) – the dict like {collector_key, Collector}
  • process_list (List[Callable]) – the list of processors or the instance of processor to process dict.
  • merge_func (Callable) – a method to generate outermost key. The given params are collector_key from collector_dict and key from every collector after collecting. None for using tuple to connect them, such as “ABC”+(“a”,”b”) -> (“ABC”, (“a”,”b”)).
collect() → dict

Collect all results of collector_dict and change the outermost key to a recombination key.

Returns:the dict after collecting.
Return type:dict
class qlib.workflow.task.collect.RecorderCollector(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable[T_co] = {'FINISHED'})
__init__(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable[T_co] = {'FINISHED'})

Init RecorderCollector.

Parameters:
  • experiment – (Experiment or str): an instance of an Experiment or the name of an Experiment (Callable): an callable function, which returns a list of experiments
  • process_list (list or Callable) – the list of processors or the instance of a processor to process dict.
  • rec_key_func (Callable) – a function to get the key of a recorder. If None, use recorder id.
  • rec_filter_func (Callable, optional) – filter the recorder by return True or False. Defaults to None.
  • artifacts_path (dict, optional) – The artifacts name and its path in Recorder. Defaults to {“pred”: “pred.pkl”, “IC”: “sig_analysis/ic.pkl”}.
  • artifacts_key (str or List, optional) – the artifacts key you want to get. If None, get all artifacts.
  • list_kwargs (str) – arguments for list_recorders function.
  • status (Iterable) – only collect recorders with specific status. None indicating collecting all the recorders
collect(artifacts_key=None, rec_filter_func=None, only_exist=True) → dict

Collect different artifacts based on recorder after filtering.

Parameters:
  • artifacts_key (str or List, optional) – the artifacts key you want to get. If None, use the default.
  • rec_filter_func (Callable, optional) – filter the recorder by return True or False. If None, use the default.
  • only_exist (bool, optional) – if only collect the artifacts when a recorder really has. If True, the recorder with exception when loading will not be collected. But if False, it will raise the exception.
Returns:

the dict after collected like {artifact: {rec_key: object}}

Return type:

dict

get_exp_name() → str

Get experiment name

Returns:experiment name
Return type:str

Group

Group can group a set of objects based on group_func and change them to a dict. After group, we provide a method to reduce them.

For example:

group: {(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}} reduce: {(A,B): {C1: object, C2: object}} -> {(A,B): object}

class qlib.model.ens.group.Group(group_func=None, ens: qlib.model.ens.ensemble.Ensemble = None)

Group the objects based on dict

__init__(group_func=None, ens: qlib.model.ens.ensemble.Ensemble = None)

Init Group.

Parameters:
  • group_func (Callable, optional) –

    Given a dict and return the group key and one of the group elements.

    For example: {(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}

  • to None. (Defaults) –
  • ens (Ensemble, optional) – If not None, do ensemble for grouped value after grouping.
group(*args, **kwargs) → dict

Group a set of objects and change them to a dict.

For example: {(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}

Returns:grouped dict
Return type:dict
reduce(*args, **kwargs) → dict

Reduce grouped dict.

For example: {(A,B): {C1: object, C2: object}} -> {(A,B): object}

Returns:reduced dict
Return type:dict
class qlib.model.ens.group.RollingGroup(ens=<qlib.model.ens.ensemble.RollingEnsemble object>)

Group the rolling dict

group(rolling_dict: dict) → dict

Given an rolling dict likes {(A,B,R): things}, return the grouped dict likes {(A,B): {R:things}}

NOTE: There is an assumption which is the rolling key is at the end of the key tuple, because the rolling results always need to be ensemble firstly.

Parameters:rolling_dict (dict) – an rolling dict. If the key is not a tuple, then do nothing.
Returns:grouped dict
Return type:dict
__init__(ens=<qlib.model.ens.ensemble.RollingEnsemble object>)

Init Group.

Parameters:
  • group_func (Callable, optional) –

    Given a dict and return the group key and one of the group elements.

    For example: {(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}

  • to None. (Defaults) –
  • ens (Ensemble, optional) – If not None, do ensemble for grouped value after grouping.

Ensemble

Ensemble module can merge the objects in an Ensemble. For example, if there are many submodels predictions, we may need to merge them into an ensemble prediction.

class qlib.model.ens.ensemble.Ensemble

Merge the ensemble_dict into an ensemble object.

For example: {Rollinga_b: object, Rollingb_c: object} -> object

When calling this class:

Args:
ensemble_dict (dict): the ensemble dict like {name: things} waiting for merging
Returns:
object: the ensemble object
class qlib.model.ens.ensemble.SingleKeyEnsemble

Extract the object if there is only one key and value in the dict. Make the result more readable. {Only key: Only value} -> Only value

If there is more than 1 key or less than 1 key, then do nothing. Even you can run this recursively to make dict more readable.

NOTE: Default runs recursively.

When calling this class:

Args:
ensemble_dict (dict): the dict. The key of the dict will be ignored.
Returns:
dict: the readable dict.
class qlib.model.ens.ensemble.RollingEnsemble

Merge a dict of rolling dataframe like prediction or IC into an ensemble.

NOTE: The values of dict must be pd.DataFrame, and have the index “datetime”.

When calling this class:

Args:
ensemble_dict (dict): a dict like {“A”: pd.DataFrame, “B”: pd.DataFrame}. The key of the dict will be ignored.
Returns:
pd.DataFrame: the complete result of rolling.
class qlib.model.ens.ensemble.AverageEnsemble

Average and standardize a dict of same shape dataframe like prediction or IC into an ensemble.

NOTE: The values of dict must be pd.DataFrame, and have the index “datetime”. If it is a nested dict, then flat it.

When calling this class:

Args:
ensemble_dict (dict): a dict like {“A”: pd.DataFrame, “B”: pd.DataFrame}. The key of the dict will be ignored.
Returns:
pd.DataFrame: the complete result of averaging and standardizing.

Utils

Some tools for task management.

qlib.workflow.task.utils.get_mongodb() → pymongo.database.Database

Get database in MongoDB, which means you need to declare the address and the name of a database at first.

For example:

Using qlib.init():

mongo_conf = {
    "task_url": task_url,  # your MongoDB url
    "task_db_name": task_db_name,  # database name
}
qlib.init(..., mongo=mongo_conf)

After qlib.init():

C["mongo"] = {
    "task_url" : "mongodb://localhost:27017/",
    "task_db_name" : "rolling_db"
}
Returns:the Database instance
Return type:Database
qlib.workflow.task.utils.list_recorders(experiment, rec_filter_func=None)

List all recorders which can pass the filter in an experiment.

Parameters:
  • experiment (str or Experiment) – the name of an Experiment or an instance
  • rec_filter_func (Callable, optional) – return True to retain the given recorder. Defaults to None.
Returns:

a dict {rid: recorder} after filtering.

Return type:

dict

class qlib.workflow.task.utils.TimeAdjuster(future=True, end_time=None)

Find appropriate date and adjust date.

__init__(future=True, end_time=None)

Initialize self. See help(type(self)) for accurate signature.

set_end_time(end_time=None)

Set end time. None for use calendar’s end time.

Parameters:end_time
get(idx: int)

Get datetime by index.

Parameters:idx (int) – index of the calendar
max() → pandas._libs.tslibs.timestamps.Timestamp

Return the max calendar datetime

align_idx(time_point, tp_type='start') → int

Align the index of time_point in the calendar.

Parameters:
  • time_point
  • tp_type (str) –
Returns:

index

Return type:

int

cal_interval(time_point_A, time_point_B) → int

Calculate the trading day interval (time_point_A - time_point_B)

Parameters:
  • time_point_A – time_point_A
  • time_point_B – time_point_B (is the past of time_point_A)
Returns:

the interval between A and B

Return type:

int

align_time(time_point, tp_type='start') → pandas._libs.tslibs.timestamps.Timestamp

Align time_point to trade date of calendar

Parameters:
  • time_point – Time point
  • tp_type – str time point type (“start”, “end”)
Returns:

pd.Timestamp

align_seg(segment: Union[dict, tuple]) → Union[dict, tuple]

Align the given date to the trade date

for example:

input: {'train': ('2008-01-01', '2014-12-31'), 'valid': ('2015-01-01', '2016-12-31'), 'test': ('2017-01-01', '2020-08-01')}

output: {'train': (Timestamp('2008-01-02 00:00:00'), Timestamp('2014-12-31 00:00:00')),
        'valid': (Timestamp('2015-01-05 00:00:00'), Timestamp('2016-12-30 00:00:00')),
        'test': (Timestamp('2017-01-03 00:00:00'), Timestamp('2020-07-31 00:00:00'))}
Parameters:segment
Returns:Union[dict, tuple]
Return type:the start and end trade date (pd.Timestamp) between the given start and end date.
truncate(segment: tuple, test_start, days: int) → tuple

Truncate the segment based on the test_start date

Parameters:
  • segment (tuple) – time segment
  • test_start
  • days (int) – The trading days to be truncated the data in this segment may need ‘days’ data days are based on the test_start. For example, if the label contains the information of 2 days in the near future, the prediction horizon 1 day. (e.g. the prediction target is Ref($close, -2)/Ref($close, -1) - 1) the days should be 2 + 1 == 3 days.
Returns:

tuple

Return type:

new segment

shift(seg: tuple, step: int, rtype='sliding') → tuple

Shift the datatime of segment

If there are None (which indicates unbounded index) in the segment, this method will return None.

Parameters:
  • seg – datetime segment
  • step (int) – rolling step
  • rtype (str) – rolling type (“sliding” or “expanding”)
Returns:

tuple

Return type:

new segment

Raises:

KeyError: – shift will raise error if the index(both start and end) is out of self.cal

qlib.workflow.task.utils.replace_task_handler_with_cache(task: dict, cache_dir: Union[str, pathlib.Path] = '.') → dict

Replace the handler in task with a cache handler. It will automatically cache the file and save it in cache_dir.

>>> import qlib
>>> qlib.auto_init()
>>> import datetime
>>> # it is simplified task
>>> task = {"dataset": {"kwargs":{'handler': {'class': 'Alpha158', 'module_path': 'qlib.contrib.data.handler', 'kwargs': {'start_time': datetime.date(2008, 1, 1), 'end_time': datetime.date(2020, 8, 1), 'fit_start_time': datetime.date(2008, 1, 1), 'fit_end_time': datetime.date(2014, 12, 31), 'instruments': 'CSI300'}}}}}
>>> new_task = replace_task_handler_with_cache(task)
>>> print(new_task)
{'dataset': {'kwargs': {'handler': 'file...Alpha158.3584f5f8b4.pkl'}}}

Online Serving

Online Manager

OnlineManager can manage a set of Online Strategy and run them dynamically.

With the change of time, the decisive models will be also changed. In this module, we call those contributing models online models. In every routine(such as every day or every minute), the online models may be changed and the prediction of them needs to be updated. So this module provides a series of methods to control this process.

This module also provides a method to simulate Online Strategy in history. Which means you can verify your strategy or find a better one.

There are 4 total situations for using different trainers in different situations:

Situations Description
Online + Trainer When you want to do a REAL routine, the Trainer will help you train the models. It will train models task by task and strategy by strategy.
Online + DelayTrainer DelayTrainer will skip concrete training until all tasks have been prepared by different strategies. It makes users can parallelly train all tasks at the end of routine or first_train. Otherwise, these functions will get stuck when each strategy prepare tasks.
Simulation + Trainer It will behave in the same way as Online + Trainer. The only difference is that it is for simulation/backtesting instead of online trading
Simulation + DelayTrainer When your models don’t have any temporal dependence, you can use DelayTrainer for the ability to multitasking. It means all tasks in all routines can be REAL trained at the end of simulating. The signals will be prepared well at different time segments (based on whether or not any new model is online).

Here is some pseudo code that demonstrate the workflow of each situation

For simplicity
  • Only one strategy is used in the strategy
  • update_online_pred is only called in the online mode and is ignored
  1. Online + Trainer
tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy

    trainer.end_train(models)
    prepare_signals()  # prepare trading signals daily

Online + DelayTrainer: the workflow is the same as Online + Trainer.

  1. Simulation + DelayTrainer
# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()

# Can we simplify current workflow?

  • Can reduce the number of state of tasks?

    • For each task, we have three phases (i.e. task, partly trained task, final trained task)
class qlib.workflow.online.manager.OnlineManager(strategies: Union[qlib.workflow.online.strategy.OnlineStrategy, List[qlib.workflow.online.strategy.OnlineStrategy]], trainer: qlib.model.trainer.Trainer = None, begin_time: Union[str, pandas._libs.tslibs.timestamps.Timestamp] = None, freq='day')

OnlineManager can manage online models with Online Strategy. It also provides a history recording of which models are online at what time.

__init__(strategies: Union[qlib.workflow.online.strategy.OnlineStrategy, List[qlib.workflow.online.strategy.OnlineStrategy]], trainer: qlib.model.trainer.Trainer = None, begin_time: Union[str, pandas._libs.tslibs.timestamps.Timestamp] = None, freq='day')

Init OnlineManager. One OnlineManager must have at least one OnlineStrategy.

Parameters:
  • strategies (Union[OnlineStrategy, List[OnlineStrategy]]) – an instance of OnlineStrategy or a list of OnlineStrategy
  • begin_time (Union[str,pd.Timestamp], optional) – the OnlineManager will begin at this time. Defaults to None for using the latest date.
  • trainer (qlib.model.trainer.Trainer) – the trainer to train task. None for using TrainerR.
  • freq (str, optional) – data frequency. Defaults to “day”.
first_train(strategies: List[qlib.workflow.online.strategy.OnlineStrategy] = None, model_kwargs: dict = {})

Get tasks from every strategy’s first_tasks method and train them. If using DelayTrainer, it can finish training all together after every strategy’s first_tasks.

Parameters:
  • strategies (List[OnlineStrategy]) – the strategies list (need this param when adding strategies). None for use default strategies.
  • model_kwargs (dict) – the params for prepare_online_models
routine(cur_time: Union[str, pandas._libs.tslibs.timestamps.Timestamp] = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})

Typical update process for every strategy and record the online history.

The typical update process after a routine, such as day by day or month by month. The process is: Update predictions -> Prepare tasks -> Prepare online models -> Prepare signals.

If using DelayTrainer, it can finish training all together after every strategy’s prepare_tasks.

Parameters:
  • cur_time (Union[str,pd.Timestamp], optional) – run routine method in this time. Defaults to None.
  • task_kwargs (dict) – the params for prepare_tasks
  • model_kwargs (dict) – the params for prepare_online_models
  • signal_kwargs (dict) – the params for prepare_signals
get_collector(**kwargs) → qlib.workflow.task.collect.MergeCollector

Get the instance of Collector to collect results from every strategy. This collector can be a basis as the signals preparation.

Parameters:**kwargs – the params for get_collector.
Returns:the collector to merge other collectors.
Return type:MergeCollector
add_strategy(strategies: Union[qlib.workflow.online.strategy.OnlineStrategy, List[qlib.workflow.online.strategy.OnlineStrategy]])

Add some new strategies to OnlineManager.

Parameters:strategy (Union[OnlineStrategy, List[OnlineStrategy]]) – a list of OnlineStrategy
prepare_signals(prepare_func: Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)

After preparing the data of the last routine (a box in box-plot) which means the end of the routine, we can prepare trading signals for the next routine.

NOTE: Given a set prediction, all signals before these prediction end times will be prepared well.

Even if the latest signal already exists, the latest calculation result will be overwritten.

Note

Given a prediction of a certain time, all signals before this time will be prepared well.

Parameters:
  • prepare_func (Callable, optional) – Get signals from a dict after collecting. Defaults to AverageEnsemble(), the results collected by MergeCollector must be {xxx:pred}.
  • over_write (bool, optional) – If True, the new signals will overwrite. If False, the new signals will append to the end of signals. Defaults to False.
Returns:

the signals.

Return type:

pd.DataFrame

get_signals() → Union[pandas.core.series.Series, pandas.core.frame.DataFrame]

Get prepared online signals.

Returns:pd.Series for only one signals every datetime. pd.DataFrame for multiple signals, for example, buy and sell operations use different trading signals.
Return type:Union[pd.Series, pd.DataFrame]
simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) → Union[pandas.core.series.Series, pandas.core.frame.DataFrame]

Starting from the current time, this method will simulate every routine in OnlineManager until the end time.

Considering the parallel training, the models and signals can be prepared after all routine simulating.

The delay training way can be DelayTrainer and the delay preparing signals way can be delay_prepare.

Parameters:
  • end_time – the time the simulation will end
  • frequency – the calendar frequency
  • task_kwargs (dict) – the params for prepare_tasks
  • model_kwargs (dict) – the params for prepare_online_models
  • signal_kwargs (dict) – the params for prepare_signals
Returns:

pd.Series for only one signals every datetime. pd.DataFrame for multiple signals, for example, buy and sell operations use different trading signals.

Return type:

Union[pd.Series, pd.DataFrame]

delay_prepare(model_kwargs={}, signal_kwargs={})

Prepare all models and signals if something is waiting for preparation.

Parameters:
  • model_kwargs – the params for end_train
  • signal_kwargs – the params for prepare_signals

Online Strategy

OnlineStrategy module is an element of online serving.

class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)

OnlineStrategy is working with Online Manager, responding to how the tasks are generated, the models are updated and signals are prepared.

__init__(name_id: str)

Init OnlineStrategy. This module MUST use Trainer to finishing model training.

Parameters:
  • name_id (str) – a unique name or id.
  • trainer (qlib.model.trainer.Trainer, optional) – a instance of Trainer. Defaults to None.
prepare_tasks(cur_time, **kwargs) → List[dict]

After the end of a routine, check whether we need to prepare and train some new tasks based on cur_time (None for latest).. Return the new tasks waiting for training.

You can find the last online models by OnlineTool.online_models.

prepare_online_models(trained_models, cur_time=None) → List[object]

Select some models from trained models and set them to online models. This is a typical implementation to online all trained models, you can override it to implement the complex method. You can find the last online models by OnlineTool.online_models if you still need them.

NOTE: Reset all online models to trained models. If there are no trained models, then do nothing.

NOTE:
Current implementation is very naive. Here is a more complex situation which is more closer to the practical scenarios. 1. Train new models at the day before test_start (at time stamp T) 2. Switch models at the test_start (at time timestamp T + 1 typically)
Parameters:
  • models (list) – a list of models.
  • cur_time (pd.Dataframe) – current time from OnlineManger. None for the latest.
Returns:

a list of online models.

Return type:

List[object]

first_tasks() → List[dict]

Generate a series of tasks firstly and return them.

get_collector() → qlib.workflow.task.collect.Collector

Get the instance of Collector to collect different results of this strategy.

For example:
  1. collect predictions in Recorder
  2. collect signals in a txt file
Returns:Collector
class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: Union[dict, List[dict]], rolling_gen: qlib.workflow.task.gen.RollingGen)

This example strategy always uses the latest rolling model sas online models.

__init__(name_id: str, task_template: Union[dict, List[dict]], rolling_gen: qlib.workflow.task.gen.RollingGen)

Init RollingStrategy.

Assumption: the str of name_id, the experiment name, and the trainer’s experiment name are the same.

Parameters:
  • name_id (str) – a unique name or id. Will be also the name of the Experiment.
  • task_template (Union[dict, List[dict]]) – a list of task_template or a single template, which will be used to generate many tasks using rolling_gen.
  • rolling_gen (RollingGen) – an instance of RollingGen
get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)

Get the instance of Collector to collect results. The returned collector must distinguish results in different models.

Assumption: the models can be distinguished based on the model name and rolling test segments. If you do not want this assumption, please implement your method or use another rec_key_func.

Parameters:
  • rec_key_func (Callable) – a function to get the key of a recorder. If None, use recorder id.
  • rec_filter_func (Callable, optional) – filter the recorder by return True or False. Defaults to None.
  • artifacts_key (List[str], optional) – the artifacts key you want to get. If None, get all artifacts.
first_tasks() → List[dict]

Use rolling_gen to generate different tasks based on task_template.

Returns:a list of tasks
Return type:List[dict]
prepare_tasks(cur_time) → List[dict]

Prepare new tasks based on cur_time (None for the latest).

You can find the last online models by OnlineToolR.online_models.

Returns:a list of new tasks.
Return type:List[dict]

Online Tool

OnlineTool is a module to set and unset a series of online models. The online models are some decisive models in some time points, which can be changed with the change of time. This allows us to use efficient submodels as the market-style changing.

class qlib.workflow.online.utils.OnlineTool

OnlineTool will manage online models in an experiment that includes the model recorders.

__init__()

Init OnlineTool.

set_online_tag(tag, recorder: Union[list, object])

Set tag to the model to sign whether online.

Parameters:
  • tag (str) – the tags in ONLINE_TAG, OFFLINE_TAG
  • recorder (Union[list,object]) – the model’s recorder
get_online_tag(recorder: object) → str

Given a model recorder and return its online tag.

Parameters:recorder (Object) – the model’s recorder
Returns:the online tag
Return type:str
reset_online_tag(recorder: Union[list, object])

Offline all models and set the recorders to ‘online’.

Parameters:recorder (Union[list,object]) – the recorder you want to reset to ‘online’.
online_models() → list

Get current online models

Returns:a list of online models.
Return type:list
update_online_pred(to_date=None)

Update the predictions of online models to to_date.

Parameters:to_date (pd.Timestamp) – the pred before this date will be updated. None for updating to the latest.
class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str = None)

The implementation of OnlineTool based on (R)ecorder.

__init__(default_exp_name: str = None)

Init OnlineToolR.

Parameters:default_exp_name (str) – the default experiment name.
set_online_tag(tag, recorder: Union[qlib.workflow.recorder.Recorder, List[T]])

Set tag to the model’s recorder to sign whether online.

Parameters:
  • tag (str) – the tags in ONLINE_TAG, NEXT_ONLINE_TAG, OFFLINE_TAG
  • recorder (Union[Recorder, List]) – a list of Recorder or an instance of Recorder
get_online_tag(recorder: qlib.workflow.recorder.Recorder) → str

Given a model recorder and return its online tag.

Parameters:recorder (Recorder) – an instance of recorder
Returns:the online tag
Return type:str
reset_online_tag(recorder: Union[qlib.workflow.recorder.Recorder, List[T]], exp_name: str = None)

Offline all models and set the recorders to ‘online’.

Parameters:
  • recorder (Union[Recorder, List]) – the recorder you want to reset to ‘online’.
  • exp_name (str) – the experiment name. If None, then use default_exp_name.
online_models(exp_name: str = None) → list

Get current online models

Parameters:exp_name (str) – the experiment name. If None, then use default_exp_name.
Returns:a list of online models.
Return type:list
update_online_pred(to_date=None, from_date=None, exp_name: str = None)

Update the predictions of online models to to_date.

Parameters:
  • to_date (pd.Timestamp) – the pred before this date will be updated. None for updating to latest time in Calendar.
  • exp_name (str) – the experiment name. If None, then use default_exp_name.

RecordUpdater

Updater is a module to update artifacts such as predictions when the stock data is updating.

class qlib.workflow.online.update.RMDLoader(rec: qlib.workflow.recorder.Recorder)

Recorder Model Dataset Loader

__init__(rec: qlib.workflow.recorder.Recorder)

Initialize self. See help(type(self)) for accurate signature.

get_dataset(start_time, end_time, segments=None, unprepared_dataset: Optional[qlib.data.dataset.DatasetH] = None) → qlib.data.dataset.DatasetH

Load, config and setup dataset.

This dataset is for inference.

Parameters:
  • start_time – the start_time of underlying data
  • end_time – the end_time of underlying data
  • segments – dict the segments config for dataset Due to the time series dataset (TSDatasetH), the test segments maybe different from start_time and end_time
  • unprepared_dataset – Optional[DatasetH] if user don’t want to load dataset from recorder, please specify user’s dataset
Returns:

the instance of DatasetH

Return type:

DatasetH

class qlib.workflow.online.update.RecordUpdater(record: qlib.workflow.recorder.Recorder, *args, **kwargs)

Update a specific recorders

__init__(record: qlib.workflow.recorder.Recorder, *args, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

update(*args, **kwargs)

Update info for specific recorder

class qlib.workflow.online.update.DSBasedUpdater(record: qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: Optional[int] = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

Dataset-Based Updater

  • Providing updating feature for Updating data based on Qlib Dataset

Assumption

  • Based on Qlib dataset

  • The data to be updated is a multi-level index pd.DataFrame. For example label, prediction.

__init__(record: qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: Optional[int] = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

Init PredUpdater.

Expected behavior in following cases:

  • if to_date is greater than the max date in the calendar, the data will be updated to the latest date
  • if there are data before from_date or after to_date, only the data between from_date and to_date are affected.
Parameters:
  • record – Recorder
  • to_date

    update to prediction to the to_date

    if to_date is None:

    data will updated to the latest date.
  • from_date

    the update will start from from_date

    if from_date is None:

    the updating will occur on the next tick after the latest data in historical data
  • hist_ref

    int Sometimes, the dataset will have historical depends. Leave the problem to users to set the length of historical dependency If user doesn’t specify this parameter, Updater will try to load dataset to automatically determine the hist_ref

    Note

    the start_time is not included in the hist_ref; So the hist_ref will be step_len - 1 in most cases

  • loader_cls – type the class to load the model and dataset
prepare_data(unprepared_dataset: Optional[qlib.data.dataset.DatasetH] = None) → qlib.data.dataset.DatasetH

Load dataset - if unprepared_dataset is specified, then prepare the dataset directly - Otherwise,

Separating this function will make it easier to reuse the dataset

Returns:the instance of DatasetH
Return type:DatasetH
update(dataset: qlib.data.dataset.DatasetH = None, write: bool = True, ret_new: bool = False) → Optional[object]
Parameters:
  • dataset (DatasetH) – DatasetH: the instance of DatasetH. None for prepare it again.
  • write (bool) – will the the write action be executed
  • ret_new (bool) – will the updated data be returned
Returns:

the updated dataset

Return type:

Optional[object]

get_update_data(dataset: qlib.data.dataset.Dataset) → pandas.core.frame.DataFrame

return the updated data based on the given dataset

The difference between get_update_data and update - update_date only include some data specific feature - update include some general routine steps(e.g. prepare dataset, checking)

class qlib.workflow.online.update.PredUpdater(record: qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: Optional[int] = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

Update the prediction in the Recorder

get_update_data(dataset: qlib.data.dataset.Dataset) → pandas.core.frame.DataFrame

return the updated data based on the given dataset

The difference between get_update_data and update - update_date only include some data specific feature - update include some general routine steps(e.g. prepare dataset, checking)

class qlib.workflow.online.update.LabelUpdater(record: qlib.workflow.recorder.Recorder, to_date=None, **kwargs)

Update the label in the recorder

Assumption - The label is generated from record_temp.SignalRecord.

__init__(record: qlib.workflow.recorder.Recorder, to_date=None, **kwargs)

Init PredUpdater.

Expected behavior in following cases:

  • if to_date is greater than the max date in the calendar, the data will be updated to the latest date
  • if there are data before from_date or after to_date, only the data between from_date and to_date are affected.
Parameters:
  • record – Recorder
  • to_date

    update to prediction to the to_date

    if to_date is None:

    data will updated to the latest date.
  • from_date

    the update will start from from_date

    if from_date is None:

    the updating will occur on the next tick after the latest data in historical data
  • hist_ref

    int Sometimes, the dataset will have historical depends. Leave the problem to users to set the length of historical dependency If user doesn’t specify this parameter, Updater will try to load dataset to automatically determine the hist_ref

    Note

    the start_time is not included in the hist_ref; So the hist_ref will be step_len - 1 in most cases

  • loader_cls – type the class to load the model and dataset
get_update_data(dataset: qlib.data.dataset.Dataset) → pandas.core.frame.DataFrame

return the updated data based on the given dataset

The difference between get_update_data and update - update_date only include some data specific feature - update include some general routine steps(e.g. prepare dataset, checking)

Utils

Serializable

class qlib.utils.serial.Serializable

Serializable will change the behaviors of pickle.

The rule to tell if a attribute will be kept or dropped when dumping. The rule with higher priorities is on the top - in the config attribute list -> always dropped - in the include attribute list -> always kept - in the exclude attribute list -> always dropped - name not starts with _ -> kept - name starts with _ -> kept if dump_all is true else dropped

It provides a syntactic sugar for distinguish the attributes which user doesn’t want. - For examples, a learnable Datahandler just wants to save the parameters without data when dumping to disk

__init__()

Initialize self. See help(type(self)) for accurate signature.

dump_all

will the object dump all object

config(recursive=False, **kwargs)

configure the serializable object

Parameters:
  • may include following keys (kwargs) –
    dump_all : bool
    will the object dump all object
    exclude : list
    What attribute will not be dumped
    include : list
    What attribute will be dumped
  • recursive (bool) – will the configuration be recursive
to_pickle(path: Union[pathlib.Path, str], **kwargs)

Dump self to a pickle file.

path (Union[Path, str]): the path to dump

kwargs may include following keys

dump_all : bool
will the object dump all object
exclude : list
What attribute will not be dumped
include : list
What attribute will be dumped
classmethod load(filepath)

Load the serializable class from a filepath.

Parameters:filepath (str) – the path of file
Raises:TypeError – the pickled file must be type(cls)
Returns:the instance of type(cls)
Return type:type(cls)
classmethod get_backend()

Return the real backend of a Serializable class. The pickle_backend value can be “pickle” or “dill”.

Returns:pickle or dill module based on pickle_backend
Return type:module
static general_dump(obj, path: Union[pathlib.Path, str])

A general dumping method for object

Parameters:
  • obj (object) – the object to be dumped
  • path (Union[Path, str]) – the target path the data will be dumped

RL

Base Component

class qlib.rl.Interpreter

Interpreter is a media between states produced by simulators and states needed by RL policies. Interpreters are two-way:

  1. From simulator state to policy state (aka observation), see StateInterpreter.
  2. From policy action to action accepted by simulator, see ActionInterpreter.

Inherit one of the two sub-classes to define your own interpreter. This super-class is only used for isinstance check.

Interpreters are recommended to be stateless, meaning that storing temporary information with self.xxx in interpreter is anti-pattern. In future, we might support register some interpreter-related states by calling self.env.register_state(), but it’s not planned for first iteration.

class qlib.rl.StateInterpreter

State Interpreter that interpret execution result of qlib executor into rl env state

validate(obs: ObsType) → None

Validate whether an observation belongs to the pre-defined observation space.

interpret(simulator_state: StateType) → ObsType

Interpret the state of simulator.

Parameters:simulator_state – Retrieved with simulator.get_state().
Returns:
Return type:State needed by policy. Should conform with the state space defined in observation_space.
class qlib.rl.ActionInterpreter

Action Interpreter that interpret rl agent action into qlib orders

validate(action: PolicyActType) → None

Validate whether an action belongs to the pre-defined action space.

interpret(simulator_state: StateType, action: PolicyActType) → ActType

Convert the policy action to simulator action.

Parameters:
  • simulator_state – Retrieved with simulator.get_state().
  • action – Raw action given by policy.
Returns:

Return type:

The action needed by simulator,

class qlib.rl.Reward

Reward calculation component that takes a single argument: state of simulator. Returns a real number: reward.

Subclass should implement reward(simulator_state) to implement their own reward calculation recipe.

reward(simulator_state: SimulatorState) → float

Implement this method for your own reward.

class qlib.rl.RewardCombination(rewards: Dict[str, Tuple[qlib.rl.reward.Reward, float]])

Combination of multiple reward.

__init__(rewards: Dict[str, Tuple[qlib.rl.reward.Reward, float]]) → None

Initialize self. See help(type(self)) for accurate signature.

reward(simulator_state: Any) → float

Implement this method for your own reward.

class qlib.rl.Simulator(initial: InitialStateType, **kwargs)

Simulator that resets with __init__, and transits with step(action).

To make the data-flow clear, we make the following restrictions to Simulator:

  1. The only way to modify the inner status of a simulator is by using step(action).
  2. External modules can read the status of a simulator by using simulator.get_state(), and check whether the simulator is in the ending state by calling simulator.done().

A simulator is defined to be bounded with three types:

  • InitialStateType that is the type of the data used to create the simulator.
  • StateType that is the type of the status (state) of the simulator.
  • ActType that is the type of the action, which is the input received in each step.

Different simulators might share the same StateType. For example, when they are dealing with the same task, but with different simulation implementation. With the same type, they can safely share other components in the MDP.

Simulators are ephemeral. The lifecycle of a simulator starts with an initial state, and ends with the trajectory. In another word, when the trajectory ends, simulator is recycled. If simulators want to share context between (e.g., for speed-up purposes), this could be done by accessing the weak reference of environment wrapper.

env

A reference of env-wrapper, which could be useful in some corner cases. Simulators are discouraged to use this, because it’s prone to induce errors.

__init__(initial: InitialStateType, **kwargs) → None

Initialize self. See help(type(self)) for accurate signature.

step(action: ActType) → None

Receives an action of ActType.

Simulator should update its internal state, and return None. The updated state can be retrieved with simulator.get_state().

done() → bool

Check whether the simulator is in a “done” state. When simulator is in a “done” state, it should no longer receives any step request. As simulators are ephemeral, to reset the simulator, the old one should be destroyed and a new simulator can be created.

Strategy

class qlib.rl.strategy.SingleOrderStrategy(order: Order, trade_range: TradeRange | None = None)

Strategy used to generate a trade decision with exactly one order.

__init__(order: Order, trade_range: TradeRange | None = None) → None
Parameters:
  • outer_trade_decision (BaseTradeDecision, optional) –

    the trade decision of outer strategy which this strategy relies, and it will be traded in [start_time, end_time], by default None

    • If the strategy is used to split trade decision, it will be used
    • If the strategy is used for portfolio management, it can be ignored
  • level_infra (LevelInfrastructure, optional) – level shared infrastructure for backtesting, including trade calendar
  • common_infra (CommonInfrastructure, optional) – common infrastructure for backtesting, including trade_account, trade_exchange, .etc
  • trade_exchange (Exchange) –

    exchange that provides market info, used to deal order and generate report

    • If trade_exchange is None, self.trade_exchange will be set with common_infra
    • It allows different trade_exchanges is used in different executions.
    • For example:
      • In daily execution, both daily exchange and minutely are usable, but the daily exchange is recommended because it run faster.
      • In minutely execution, the daily exchange is not usable, only the minutely exchange is recommended.
generate_trade_decision(execute_result: list | None = None) → TradeDecisionWO

Generate trade decision in each trading bar

Parameters:execute_result (List[object], optional) –

the executed result for trade decision, by default None

  • When call the generate_trade_decision firstly, execute_result could be None

Trainer

Train, test, inference utilities.

class qlib.rl.trainer.Trainer(*, max_iters: int | None = None, val_every_n_iters: int | None = None, loggers: LogWriter | List[LogWriter] | None = None, callbacks: List[Callback] | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2, fast_dev_run: int | None = None)

Utility to train a policy on a particular task.

Different from traditional DL trainer, the iteration of this trainer is “collect”, rather than “epoch”, or “mini-batch”. In each collect, Collector collects a number of policy-env interactions, and accumulates them into a replay buffer. This buffer is used as the “data” to train the policy. At the end of each collect, the policy is updated several times.

The API has some resemblence with PyTorch Lightning, but it’s essentially different because this trainer is built for RL applications, and thus most configurations are under RL context. We are still looking for ways to incorporate existing trainer libraries, because it looks like big efforts to build a trainer as powerful as those libraries, and also, that’s not our primary goal.

It’s essentially different tianshou’s built-in trainers, as it’s far much more complicated than that.

Parameters:
  • max_iters – Maximum iterations before stopping.
  • val_every_n_iters – Perform validation every n iterations (i.e., training collects).
  • logger – Logger to record the backtest results. Logger must be present because without logger, all information will be lost.
  • finite_env_type – Type of finite env implementation.
  • concurrency – Parallel workers.
  • fast_dev_run – Create a subset for debugging. How this is implemented depends on the implementation of training vessel. For TrainingVessel, if greater than zero, a random subset sized fast_dev_run will be used instead of train_initial_states and val_initial_states.
should_stop = None

Set to stop the training.

metrics = None

Numeric metrics of produced in train/val/test. In the middle of training / validation, metrics will be of the latest episode. When each iteration of training / validation finishes, metrics will be the aggregation of all episodes encountered in this iteration.

Cleared on every new iteration of training.

In fit, validation metrics will be prefixed with val/.

current_iter = None

Current iteration (collect) of training.

__init__(*, max_iters: int | None = None, val_every_n_iters: int | None = None, loggers: LogWriter | List[LogWriter] | None = None, callbacks: List[Callback] | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2, fast_dev_run: int | None = None)

Initialize self. See help(type(self)) for accurate signature.

loggers = None

A list of log writers.

initialize()

Initialize the whole training process.

The states here should be synchronized with state_dict.

initialize_iter()

Initialize one iteration / collect.

state_dict() → dict

Putting every states of current training into a dict, at best effort.

It doesn’t try to handle all the possible kinds of states in the middle of one training collect. For most cases at the end of each iteration, things should be usually correct.

Note that it’s also intended behavior that replay buffer data in the collector will be lost.

load_state_dict(state_dict: dict) → None

Load all states into current trainer.

named_callbacks() → Dict[str, qlib.rl.trainer.callbacks.Callback]

Retrieve a collection of callbacks where each one has a name. Useful when saving checkpoints.

named_loggers() → Dict[str, qlib.rl.utils.log.LogWriter]

Retrieve a collection of loggers where each one has a name. Useful when saving checkpoints.

fit(vessel: TrainingVesselBase, ckpt_path: Path | None = None) → None

Train the RL policy upon the defined simulator.

Parameters:
  • vessel – A bundle of all elements used in training.
  • ckpt_path – Load a pre-trained / paused training checkpoint.
test(vessel: qlib.rl.trainer.vessel.TrainingVesselBase) → None

Test the RL policy against the simulator.

The simulator will be fed with data generated in test_seed_iterator.

Parameters:vessel – A bundle of all related elements.
venv_from_iterator(iterator: Iterable[InitialStateType]) → qlib.rl.utils.finite_env.FiniteVectorEnv

Create a vectorized environment from iterator and the training vessel.

class qlib.rl.trainer.TrainingVessel(*, simulator_fn: Callable[[InitialStateType], Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], policy: BasePolicy, reward: Reward, train_initial_states: Sequence[InitialStateType] | None = None, val_initial_states: Sequence[InitialStateType] | None = None, test_initial_states: Sequence[InitialStateType] | None = None, buffer_size: int = 20000, episode_per_iter: int = 1000, update_kwargs: Dict[str, Any] = None)

The default implementation of training vessel.

__init__ accepts a sequence of initial states so that iterator can be created. train, validate, test each do one collect (and also update in train). By default, the train initial states will be repeated infinitely during training, and collector will control the number of episodes for each iteration. In validation and testing, the val / test initial states will be used exactly once.

Extra hyper-parameters (only used in train) include:

  • buffer_size: Size of replay buffer.
  • episode_per_iter: Episodes per collect at training. Can be overridden by fast dev run.
  • update_kwargs: Keyword arguments appearing in policy.update. For example, dict(repeat=10, batch_size=64).
__init__(*, simulator_fn: Callable[[InitialStateType], Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], policy: BasePolicy, reward: Reward, train_initial_states: Sequence[InitialStateType] | None = None, val_initial_states: Sequence[InitialStateType] | None = None, test_initial_states: Sequence[InitialStateType] | None = None, buffer_size: int = 20000, episode_per_iter: int = 1000, update_kwargs: Dict[str, Any] = None)

Initialize self. See help(type(self)) for accurate signature.

train_seed_iterator() → ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

Override this to create a seed iterator for training. If the iterable is a context manager, the whole training will be invoked in the with-block, and the iterator will be automatically closed after the training is done.

val_seed_iterator() → ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

Override this to create a seed iterator for validation.

test_seed_iterator() → ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

Override this to create a seed iterator for testing.

train(vector_env: qlib.rl.utils.finite_env.FiniteVectorEnv) → Dict[str, Any]

Create a collector and collects episode_per_iter episodes. Update the policy on the collected replay buffer.

validate(vector_env: qlib.rl.utils.finite_env.FiniteVectorEnv) → Dict[str, Any]

Implement this to validate the policy once.

test(vector_env: qlib.rl.utils.finite_env.FiniteVectorEnv) → Dict[str, Any]

Implement this to evaluate the policy on test environment once.

class qlib.rl.trainer.TrainingVesselBase

A ship that contains simulator, interpreter, and policy, will be sent to trainer. This class controls algorithm-related parts of training, while trainer is responsible for runtime part.

The ship also defines the most important logic of the core training part, and (optionally) some callbacks to insert customized logics at specific events.

train_seed_iterator() → ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

Override this to create a seed iterator for training. If the iterable is a context manager, the whole training will be invoked in the with-block, and the iterator will be automatically closed after the training is done.

val_seed_iterator() → ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

Override this to create a seed iterator for validation.

test_seed_iterator() → ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

Override this to create a seed iterator for testing.

train(vector_env: tianshou.env.venvs.BaseVectorEnv) → Dict[str, Any]

Implement this to train one iteration. In RL, one iteration usually refers to one collect.

validate(vector_env: qlib.rl.utils.finite_env.FiniteVectorEnv) → Dict[str, Any]

Implement this to validate the policy once.

test(vector_env: qlib.rl.utils.finite_env.FiniteVectorEnv) → Dict[str, Any]

Implement this to evaluate the policy on test environment once.

state_dict() → Dict[KT, VT]

Return a checkpoint of current vessel state.

load_state_dict(state_dict: Dict[KT, VT]) → None

Restore a checkpoint from a previously saved state dict.

class qlib.rl.trainer.Checkpoint(dirpath: Path, filename: str = '{iter:03d}.pth', save_latest: Literal[('link', 'copy')] | None = 'link', every_n_iters: int | None = None, time_interval: int | None = None, save_on_fit_end: bool = True)

Save checkpoints periodically for persistence and recovery.

Reference: https://github.com/PyTorchLightning/pytorch-lightning/blob/bfa8b7be/pytorch_lightning/callbacks/model_checkpoint.py

Parameters:
  • dirpath – Directory to save the checkpoint file.
  • filename

    Checkpoint filename. Can contain named formatting options to be auto-filled. For example: {iter:03d}-{reward:.2f}.pth. Supported argument names are:

    • iter (int)
    • metrics in trainer.metrics
    • time string, in the format of %Y%m%d%H%M%S
  • save_latest – Save the latest checkpoint in latest.pth. If link, latest.pth will be created as a softlink. If copy, latest.pth will be stored as an individual copy. Set to none to disable this.
  • every_n_iters – Checkpoints are saved at the end of every n iterations of training, after validation if applicable.
  • time_interval – Maximum time (seconds) before checkpoints save again.
  • save_on_fit_end – Save one last checkpoint at the end to fit. Do nothing if a checkpoint is already saved there.
__init__(dirpath: Path, filename: str = '{iter:03d}.pth', save_latest: Literal[('link', 'copy')] | None = 'link', every_n_iters: int | None = None, time_interval: int | None = None, save_on_fit_end: bool = True)

Initialize self. See help(type(self)) for accurate signature.

on_fit_end(trainer: Trainer, vessel: TrainingVesselBase) → None

Called after the whole fit process ends.

on_iter_end(trainer: Trainer, vessel: TrainingVesselBase) → None

Called upon every end of iteration. This is called after the bump of current_iter, when the previous iteration is considered complete.

class qlib.rl.trainer.EarlyStopping(monitor: str = 'reward', min_delta: float = 0.0, patience: int = 0, mode: Literal[('min', 'max')] = 'max', baseline: float | None = None, restore_best_weights: bool = False)

Stop training when a monitored metric has stopped improving.

The earlystopping callback will be triggered each time validation ends. It will examine the metrics produced in validation, and get the metric with name monitor` (``monitor is reward by default), to check whether it’s no longer increasing / decreasing. It takes min_delta and patience if applicable. If it’s found to be not increasing / decreasing any more. trainer.should_stop will be set to true, and the training terminates.

Implementation reference: https://github.com/keras-team/keras/blob/v2.9.0/keras/callbacks.py#L1744-L1893

__init__(monitor: str = 'reward', min_delta: float = 0.0, patience: int = 0, mode: Literal[('min', 'max')] = 'max', baseline: float | None = None, restore_best_weights: bool = False)

Initialize self. See help(type(self)) for accurate signature.

state_dict() → dict

Get a state dict of the callback for pause and resume.

load_state_dict(state_dict: dict) → None

Resume the callback from a saved state dict.

on_fit_start(trainer: Trainer, vessel: TrainingVesselBase) → None

Called before the whole fit process begins.

on_validate_end(trainer: Trainer, vessel: TrainingVesselBase) → None

Called when the validation ends.

class qlib.rl.trainer.MetricsWriter(dirpath: pathlib.Path)

Dump training metrics to file.

__init__(dirpath: pathlib.Path) → None

Initialize self. See help(type(self)) for accurate signature.

on_train_end(trainer: Trainer, vessel: TrainingVesselBase) → None

Called when the training ends. To access all outputs produced during training, cache the data in either trainer and vessel, and post-process them in this hook.

on_validate_end(trainer: Trainer, vessel: TrainingVesselBase) → None

Called when the validation ends.

qlib.rl.trainer.train(simulator_fn: Callable[[InitialStateType], qlib.rl.simulator.Simulator], state_interpreter: qlib.rl.interpreter.StateInterpreter, action_interpreter: qlib.rl.interpreter.ActionInterpreter, initial_states: Sequence[InitialStateType], policy: tianshou.policy.base.BasePolicy, reward: qlib.rl.reward.Reward, vessel_kwargs: Dict[str, Any], trainer_kwargs: Dict[str, Any]) → None

Train a policy with the parallelism provided by RL framework.

Experimental API. Parameters might change shortly.

Parameters:
  • simulator_fn – Callable receiving initial seed, returning a simulator.
  • state_interpreter – Interprets the state of simulators.
  • action_interpreter – Interprets the policy actions.
  • initial_states – Initial states to iterate over. Every state will be run exactly once.
  • policy – Policy to train against.
  • reward – Reward function.
  • vessel_kwargs – Keyword arguments passed to TrainingVessel, like episode_per_iter.
  • trainer_kwargs – Keyword arguments passed to Trainer, like finite_env_type, concurrency.
qlib.rl.trainer.backtest(simulator_fn: Callable[[InitialStateType], Simulator], state_interpreter: StateInterpreter, action_interpreter: ActionInterpreter, initial_states: Sequence[InitialStateType], policy: BasePolicy, logger: LogWriter | List[LogWriter], reward: Reward | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2) → None

Backtest with the parallelism provided by RL framework.

Experimental API. Parameters might change shortly.

Parameters:
  • simulator_fn – Callable receiving initial seed, returning a simulator.
  • state_interpreter – Interprets the state of simulators.
  • action_interpreter – Interprets the policy actions.
  • initial_states – Initial states to iterate over. Every state will be run exactly once.
  • policy – Policy to test against.
  • logger – Logger to record the backtest results. Logger must be present because without logger, all information will be lost.
  • reward – Optional reward function. For backtest, this is for testing the rewards and logging them only.
  • finite_env_type – Type of finite env implementation.
  • concurrency – Parallel workers.

Order Execution

Currently it supports single-asset order execution. Multi-asset is on the way.

class qlib.rl.order_execution.FullHistoryStateInterpreter(max_step: int, data_ticks: int, data_dim: int, processed_data_provider: dict | ProcessedDataProvider)

The observation of all the history, including today (until this moment), and yesterday.

Parameters:
  • max_step – Total number of steps (an upper-bound estimation). For example, 390min / 30min-per-step = 13 steps.
  • data_ticks – Equal to the total number of records. For example, in SAOE per minute, the total ticks is the length of day in minutes.
  • data_dim – Number of dimensions in data.
  • processed_data_provider – Provider of the processed data.
__init__(max_step: int, data_ticks: int, data_dim: int, processed_data_provider: dict | ProcessedDataProvider) → None

Initialize self. See help(type(self)) for accurate signature.

interpret(state: qlib.rl.order_execution.state.SAOEState) → qlib.rl.order_execution.interpreter.FullHistoryObs

Interpret the state of simulator.

Parameters:simulator_state – Retrieved with simulator.get_state().
Returns:
Return type:State needed by policy. Should conform with the state space defined in observation_space.
class qlib.rl.order_execution.CurrentStepStateInterpreter(max_step: int)

The observation of current step.

Used when policy only depends on the latest state, but not history. The key list is not full. You can add more if more information is needed by your policy.

__init__(max_step: int) → None

Initialize self. See help(type(self)) for accurate signature.

interpret(state: qlib.rl.order_execution.state.SAOEState) → qlib.rl.order_execution.interpreter.CurrentStateObs

Interpret the state of simulator.

Parameters:simulator_state – Retrieved with simulator.get_state().
Returns:
Return type:State needed by policy. Should conform with the state space defined in observation_space.
class qlib.rl.order_execution.CategoricalActionInterpreter(values: int | List[float], max_step: Optional[int] = None)

Convert a discrete policy action to a continuous action, then multiplied by order.amount.

Parameters:
  • values – It can be a list of length $L$: $[a_1, a_2, ldots, a_L]$. Then when policy givens decision $x$, $a_x$ times order amount is the output. It can also be an integer $n$, in which case the list of length $n+1$ is auto-generated, i.e., $[0, 1/n, 2/n, ldots, n/n]$.
  • max_step – Total number of steps (an upper-bound estimation). For example, 390min / 30min-per-step = 13 steps.
__init__(values: int | List[float], max_step: Optional[int] = None) → None

Initialize self. See help(type(self)) for accurate signature.

interpret(state: qlib.rl.order_execution.state.SAOEState, action: int) → float

Convert the policy action to simulator action.

Parameters:
  • simulator_state – Retrieved with simulator.get_state().
  • action – Raw action given by policy.
Returns:

Return type:

The action needed by simulator,

class qlib.rl.order_execution.TwapRelativeActionInterpreter

Convert a continuous ratio to deal amount.

The ratio is relative to TWAP on the remainder of the day. For example, there are 5 steps left, and the left position is 300. With TWAP strategy, in each position, 60 should be traded. When this interpreter receives action $a$, its output is $60 cdot a$.

interpret(state: qlib.rl.order_execution.state.SAOEState, action: float) → float

Convert the policy action to simulator action.

Parameters:
  • simulator_state – Retrieved with simulator.get_state().
  • action – Raw action given by policy.
Returns:

Return type:

The action needed by simulator,

class qlib.rl.order_execution.Recurrent(obs_space: qlib.rl.order_execution.interpreter.FullHistoryObs, hidden_dim: int = 64, output_dim: int = 32, rnn_type: typing_extensions.Literal['rnn', 'lstm', 'gru'][rnn, lstm, gru] = 'gru', rnn_num_layers: int = 1)

The network architecture proposed in OPD.

At every time step the input of policy network is divided into two parts, the public variables and the private variables. which are handled by raw_rnn and pri_rnn in this network, respectively.

One minor difference is that, in this implementation, we don’t assume the direction to be fixed. Thus, another dire_fc is added to produce an extra direction-related feature.

__init__(obs_space: qlib.rl.order_execution.interpreter.FullHistoryObs, hidden_dim: int = 64, output_dim: int = 32, rnn_type: typing_extensions.Literal['rnn', 'lstm', 'gru'][rnn, lstm, gru] = 'gru', rnn_num_layers: int = 1) → None

Initializes internal Module state, shared by both nn.Module and ScriptModule.

forward(batch: tianshou.data.batch.Batch) → torch.Tensor

Input should be a dict (at least) containing:

  • data_processed: [N, T, C]
  • cur_step: [N] (int)
  • cur_time: [N] (int)
  • position_history: [N, S] (S is number of steps)
  • target: [N]
  • num_step: [N] (int)
  • acquiring: [N] (0 or 1)
class qlib.rl.order_execution.AllOne(obs_space: gym.Space, action_space: gym.Space, fill_value: float | int = 1.0)

Forward returns a batch full of 1.

Useful when implementing some baselines (e.g., TWAP).

__init__(obs_space: gym.Space, action_space: gym.Space, fill_value: float | int = 1.0) → None

Initialize self. See help(type(self)) for accurate signature.

forward(batch: Batch, state: dict | Batch | np.ndarray = None, **kwargs) → Batch

Compute action over the given batch data.

Returns:A Batch which MUST have the following keys:
  • act an numpy.ndarray or a torch.Tensor, the action over given batch data.
  • state a dict, an numpy.ndarray or a torch.Tensor, the internal state of the policy, None as default.

Other keys are user-defined. It depends on the algorithm. For example,

# some code
return Batch(logits=..., act=..., state=None, dist=...)

The keyword policy is reserved and the corresponding data will be stored into the replay buffer. For instance,

# some code
return Batch(..., policy=Batch(log_prob=dist.log_prob(act)))
# and in the sampled data batch, you can directly use
# batch.policy.log_prob to get your data.

Note

In continuous action space, you should do another step “map_action” to get the real action:

act = policy(batch).act  # doesn't map to the target action range
act = policy.map_action(act, batch)
class qlib.rl.order_execution.PPO(network: torch.nn.modules.module.Module, obs_space: gym.spaces.space.Space, action_space: gym.spaces.space.Space, lr: float, weight_decay: float = 0.0, discount_factor: float = 1.0, max_grad_norm: float = 100.0, reward_normalization: bool = True, eps_clip: float = 0.3, value_clip: bool = True, vf_coef: float = 1.0, gae_lambda: float = 1.0, max_batch_size: int = 256, deterministic_eval: bool = True, weight_file: Optional[pathlib.Path] = None)

A wrapper of tianshou PPOPolicy.

Differences:

  • Auto-create actor and critic network. Supports discrete action space only.
  • Dedup common parameters between actor network and critic network (not sure whether this is included in latest tianshou or not).
  • Support a weight_file that supports loading checkpoint.
  • Some parameters’ default values are different from original.
__init__(network: torch.nn.modules.module.Module, obs_space: gym.spaces.space.Space, action_space: gym.spaces.space.Space, lr: float, weight_decay: float = 0.0, discount_factor: float = 1.0, max_grad_norm: float = 100.0, reward_normalization: bool = True, eps_clip: float = 0.3, value_clip: bool = True, vf_coef: float = 1.0, gae_lambda: float = 1.0, max_batch_size: int = 256, deterministic_eval: bool = True, weight_file: Optional[pathlib.Path] = None) → None

Initialize self. See help(type(self)) for accurate signature.

class qlib.rl.order_execution.PAPenaltyReward(penalty: float = 100.0, scale: float = 1.0)

Encourage higher PAs, but penalize stacking all the amounts within a very short time. Formally, for each time step, the reward is \((PA_t * vol_t / target - vol_t^2 * penalty)\).

Parameters:
  • penalty – The penalty for large volume in a short time.
  • scale – The weight used to scale up or down the reward.
__init__(penalty: float = 100.0, scale: float = 1.0) → None

Initialize self. See help(type(self)) for accurate signature.

reward(simulator_state: qlib.rl.order_execution.state.SAOEState) → float

Implement this method for your own reward.

class qlib.rl.order_execution.SingleAssetOrderExecutionSimple(order: qlib.backtest.decision.Order, data_dir: pathlib.Path, feature_columns_today: List[str] = [], feature_columns_yesterday: List[str] = [], data_granularity: int = 1, ticks_per_step: int = 30, vol_threshold: Optional[float] = None)

Single-asset order execution (SAOE) simulator.

As there’s no “calendar” in the simple simulator, ticks are used to trade. A tick is a record (a line) in the pickle-styled data file. Each tick is considered as a individual trading opportunity. If such fine granularity is not needed, use ticks_per_step to lengthen the ticks for each step.

In each step, the traded amount are “equally” separated to each tick, then bounded by volume maximum execution volume (i.e., vol_threshold), and if it’s the last step, try to ensure all the amount to be executed.

Parameters:
  • order – The seed to start an SAOE simulator is an order.
  • data_dir – Path to load backtest data.
  • feature_columns_today – Columns of today’s feature.
  • feature_columns_yesterday – Columns of yesterday’s feature.
  • data_granularity – Number of ticks between consecutive data entries.
  • ticks_per_step – How many ticks per step.
  • vol_threshold – Maximum execution volume (divided by market execution volume).
__init__(order: qlib.backtest.decision.Order, data_dir: pathlib.Path, feature_columns_today: List[str] = [], feature_columns_yesterday: List[str] = [], data_granularity: int = 1, ticks_per_step: int = 30, vol_threshold: Optional[float] = None) → None

Initialize self. See help(type(self)) for accurate signature.

ticks_index = None

All available ticks for the day (not restricted to order).

ticks_for_order = None

Ticks that is available for trading (sliced by order).

twap_price = None

This price is used to compute price advantage. It”s defined as the average price in the period from order”s start time to end time.

history_exec = None

All execution history at every possible time ticks. See SAOEMetrics for available columns. Index is datetime.

history_steps = None

Positions at each step. The position before first step is also recorded. See SAOEMetrics for available columns. Index is datetime, which is the starting time of each step.

metrics = None

Metrics. Only available when done.

step(amount: float) → None

Execute one step or SAOE.

Parameters:amount – The amount you wish to deal. The simulator doesn’t guarantee all the amount to be successfully dealt.
done() → bool

Check whether the simulator is in a “done” state. When simulator is in a “done” state, it should no longer receives any step request. As simulators are ephemeral, to reset the simulator, the old one should be destroyed and a new simulator can be created.

class qlib.rl.order_execution.SAOEStateAdapter(order: qlib.backtest.decision.Order, trade_decision: qlib.backtest.decision.BaseTradeDecision, executor: qlib.backtest.executor.BaseExecutor, exchange: qlib.backtest.exchange.Exchange, ticks_per_step: int, backtest_data: qlib.rl.data.native.IntradayBacktestData, data_granularity: int = 1)

Maintain states of the environment. SAOEStateAdapter accepts execution results and update its internal state according to the execution results with additional information acquired from executors & exchange. For example, it gets the dealt order amount from execution results, and get the corresponding market price / volume from exchange.

Example usage:

adapter = SAOEStateAdapter(...)
adapter.update(...)
state = adapter.saoe_state
__init__(order: qlib.backtest.decision.Order, trade_decision: qlib.backtest.decision.BaseTradeDecision, executor: qlib.backtest.executor.BaseExecutor, exchange: qlib.backtest.exchange.Exchange, ticks_per_step: int, backtest_data: qlib.rl.data.native.IntradayBacktestData, data_granularity: int = 1) → None

Initialize self. See help(type(self)) for accurate signature.

generate_metrics_after_done() → None

Generate metrics once the upper level execution is done

class qlib.rl.order_execution.SAOEMetrics

Metrics for SAOE accumulated for a “period”. It could be accumulated for a day, or a period of time (e.g., 30min), or calculated separately for every minute.

Warning

The type hints are for single elements. In lots of times, they can be vectorized. For example, market_volume could be a list of float (or ndarray) rather tahn a single float.

stock_id = None

Stock ID of this record.

datetime = None

Datetime of this record (this is index in the dataframe).

direction = None

Direction of the order. 0 for sell, 1 for buy.

market_volume = None

(total) market volume traded in the period.

market_price = None

Deal price. If it’s a period of time, this is the average market deal price.

amount = None

Total amount (volume) strategy intends to trade.

inner_amount = None

Total amount that the lower-level strategy intends to trade (might be larger than amount, e.g., to ensure ffr).

deal_amount = None

Amount that successfully takes effect (must be less than inner_amount).

trade_price = None

The average deal price for this strategy.

trade_value = None

Total worth of trading. In the simple simulation, trade_value = deal_amount * price.

position = None

Position left after this “period”.

ffr = None

Completed how much percent of the daily order.

pa = None

Price advantage compared to baseline (i.e., trade with baseline market price). The baseline is trade price when using TWAP strategy to execute this order. Please note that there could be data leak here). Unit is BP (basis point, 1/10000).

class qlib.rl.order_execution.SAOEState

Data structure holding a state for SAOE simulator.

order

The order we are dealing with.

cur_time
Type:Current time, e.g., 9
cur_step

Current step, e.g., 0.

position

Current remaining volume to execute.

history_exec

See SingleAssetOrderExecution.history_exec.

history_steps

See SingleAssetOrderExecution.history_steps.

metrics

Daily metric, only available when the trading is in “done” state.

backtest_data

Backtest data is included in the state. Actually, only the time index of this data is needed, at this moment. I include the full data so that algorithms (e.g., VWAP) that relies on the raw data can be implemented. Interpreter can use this as they wish, but they should be careful not to leak future data.

ticks_per_step

How many ticks for each step.

ticks_index

31, …, 14:59].

Type:Trading ticks in all day, NOT sliced by order (defined in data) e.g., [9
Type:30, 9
ticks_for_order

46, …, 14:44].

Type:Trading ticks sliced by order, e.g., [9
Type:45, 9
class qlib.rl.order_execution.SAOEStrategy(policy: BasePolicy, outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, data_granularity: int = 1, **kwargs)

RL-based strategies that use SAOEState as state.

__init__(policy: BasePolicy, outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, data_granularity: int = 1, **kwargs) → None
Parameters:policy – RL policy for generate action
reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs) → None
  • reset level_infra, used to reset trade calendar, .etc
  • reset common_infra, used to reset trade_account, trade_exchange, .etc
  • reset outer_trade_decision, used to make split decision

NOTE: split this function into reset and _reset will make following cases more convenient 1. Users want to initialize his strategy by overriding reset, but they don’t want to affect the _reset called when initialization

post_upper_level_exe_step() → None

A hook for doing sth after the upper level executor finished its execution (for example, finalize the metrics collection).

post_exe_step(execute_result: Optional[list]) → None

A hook for doing sth after the corresponding executor finished its execution.

Parameters:execute_result – the execution result
generate_trade_decision(execute_result: list | None = None) → Union[BaseTradeDecision, Generator[Any, Any, BaseTradeDecision]]

For SAOEStrategy, we need to update the self._last_step_range every time a decision is generated. This operation should be invisible to developers, so we implement it in generate_trade_decision() The concrete logic to generate decisions should be implemented in _generate_trade_decision(). In other words, all subclass of SAOEStrategy should overwrite _generate_trade_decision() instead of generate_trade_decision().

class qlib.rl.order_execution.ProxySAOEStrategy(outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)

Proxy strategy that uses SAOEState. It is called a ‘proxy’ strategy because it does not make any decisions by itself. Instead, when the strategy is required to generate a decision, it will yield the environment’s information and let the outside agents to make the decision. Please refer to _generate_trade_decision for more details.

__init__(outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs) → None
Parameters:policy – RL policy for generate action
reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs) → None
  • reset level_infra, used to reset trade calendar, .etc
  • reset common_infra, used to reset trade_account, trade_exchange, .etc
  • reset outer_trade_decision, used to make split decision

NOTE: split this function into reset and _reset will make following cases more convenient 1. Users want to initialize his strategy by overriding reset, but they don’t want to affect the _reset called when initialization

class qlib.rl.order_execution.SAOEIntStrategy(policy: dict | BasePolicy, state_interpreter: dict | StateInterpreter, action_interpreter: dict | ActionInterpreter, network: dict | torch.nn.Module | None = None, outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)

(SAOE)state based strategy with (Int)preters.

__init__(policy: dict | BasePolicy, state_interpreter: dict | StateInterpreter, action_interpreter: dict | ActionInterpreter, network: dict | torch.nn.Module | None = None, outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs) → None
Parameters:policy – RL policy for generate action
reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs) → None
  • reset level_infra, used to reset trade calendar, .etc
  • reset common_infra, used to reset trade_account, trade_exchange, .etc
  • reset outer_trade_decision, used to make split decision

NOTE: split this function into reset and _reset will make following cases more convenient 1. Users want to initialize his strategy by overriding reset, but they don’t want to affect the _reset called when initialization

Utils

class qlib.rl.utils.LogLevel

Log-levels for RL training. The behavior of handling each log level depends on the implementation of LogWriter.

DEBUG = 10

If you only want to see the metric in debug mode.

PERIODIC = 20

If you want to see the metric periodically.

INFO = 30

Important log messages.

CRITICAL = 40

LogWriter should always handle CRITICAL messages

class qlib.rl.utils.DataQueue(dataset: Sequence[T], repeat: int = 1, shuffle: bool = True, producer_num_workers: int = 0, queue_maxsize: int = 0)

Main process (producer) produces data and stores them in a queue. Sub-processes (consumers) can retrieve the data-points from the queue. Data-points are generated via reading items from dataset.

DataQueue is ephemeral. You must create a new DataQueue when the repeat is exhausted.

See the documents of qlib.rl.utils.FiniteVectorEnv for more background.

Parameters:
  • dataset – The dataset to read data from. Must implement __len__ and __getitem__.
  • repeat – Iterate over the data-points for how many times. Use -1 to iterate forever.
  • shuffle – If shuffle is true, the items will be read in random order.
  • producer_num_workers – Concurrent workers for data-loading.
  • queue_maxsize – Maximum items to put into queue before it jams.

Examples

>>> data_queue = DataQueue(my_dataset)
>>> with data_queue:
...     ...

In worker:

>>> for data in data_queue:
...     print(data)
__init__(dataset: Sequence[T], repeat: int = 1, shuffle: bool = True, producer_num_workers: int = 0, queue_maxsize: int = 0) → None

Initialize self. See help(type(self)) for accurate signature.

class qlib.rl.utils.EnvWrapper(simulator_fn: Callable[..., Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], seed_iterator: Optional[Iterable[InitialStateType]], reward_fn: Reward | None = None, aux_info_collector: AuxiliaryInfoCollector[StateType, Any] | None = None, logger: LogCollector | None = None)

Qlib-based RL environment, subclassing gym.Env. A wrapper of components, including simulator, state-interpreter, action-interpreter, reward.

This is what the framework of simulator - interpreter - policy looks like in RL training. All the components other than policy needs to be assembled into a single object called “environment”. The “environment” are replicated into multiple workers, and (at least in tianshou’s implementation), one single policy (agent) plays against a batch of environments.

Parameters:
  • simulator_fn – A callable that is the simulator factory. When seed_iterator is present, the factory should take one argument, that is the seed (aka initial state). Otherwise, it should take zero argument.
  • state_interpreter – State-observation converter.
  • action_interpreter – Policy-simulator action converter.
  • seed_iterator – An iterable of seed. With the help of qlib.rl.utils.DataQueue, environment workers in different processes can share one seed_iterator.
  • reward_fn – A callable that accepts the StateType and returns a float (at least in single-agent case).
  • aux_info_collector – Collect auxiliary information. Could be useful in MARL.
  • logger – Log collector that collects the logs. The collected logs are sent back to main process, via the return value of env.step().
status

Status indicator. All terms are in RL language. It can be used if users care about data on the RL side. Can be none when no trajectory is available.

Type:EnvWrapperStatus
__init__(simulator_fn: Callable[..., Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], seed_iterator: Optional[Iterable[InitialStateType]], reward_fn: Reward | None = None, aux_info_collector: AuxiliaryInfoCollector[StateType, Any] | None = None, logger: LogCollector | None = None) → None

Initialize self. See help(type(self)) for accurate signature.

reset(**kwargs) → ObsType

Try to get a state from state queue, and init the simulator with this state. If the queue is exhausted, generate an invalid (nan) observation.

step(policy_action: PolicyActType, **kwargs) → Tuple[ObsType, float, bool, qlib.rl.utils.env_wrapper.InfoDict]

Environment step.

See the code along with comments to get a sequence of things happening here.

render(mode: str = 'human') → None

Compute the render frames as specified by render_mode attribute during initialization of the environment.

The set of supported modes varies per environment. (And some third-party environments may not support rendering at all.) By convention, if render_mode is:

  • None (default): no render is computed.
  • human: render return None. The environment is continuously rendered in the current display or terminal. Usually for human consumption.
  • rgb_array: return a single frame representing the current state of the environment. A frame is a numpy.ndarray with shape (x, y, 3) representing RGB values for an x-by-y pixel image.
  • rgb_array_list: return a list of frames representing the states of the environment since the last reset. Each frame is a numpy.ndarray with shape (x, y, 3), as with rgb_array.
  • ansi: Return a strings (str) or StringIO.StringIO containing a terminal-style text representation for each time step. The text can include newlines and ANSI escape sequences (e.g. for colors).

Note

Make sure that your class’s metadata ‘render_modes’ key includes the list of supported modes. It’s recommended to call super() in implementations to use the functionality of this method.

class qlib.rl.utils.LogCollector(min_loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>)

Logs are first collected in each environment worker, and then aggregated to stream at the central thread in vector env.

In LogCollector, every metric is added to a dict, which needs to be reset() at each step. The dict is sent via the info in env.step(), and decoded by the LogWriter at vector env.

min_loglevel is for optimization purposes: to avoid too much traffic on networks / in pipe.

__init__(min_loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>) → None

Initialize self. See help(type(self)) for accurate signature.

reset() → None

Clear all collected contents.

add_string(name: str, string: str, loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>) → None

Add a string with name into logged contents.

add_scalar(name: str, scalar: Any, loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>) → None

Add a scalar with name into logged contents. Scalar will be converted into a float.

add_array(name: str, array: np.ndarray | pd.DataFrame | pd.Series, loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>) → None

Add an array with name into logging.

add_any(name: str, obj: Any, loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>) → None

Log something with any type.

As it’s an “any” object, the only LogWriter accepting it is pickle. Therefore, pickle must be able to serialize it.

class qlib.rl.utils.LogWriter(loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>)

Base class for log writers, triggered at every reset and step by finite env.

What to do with a specific log depends on the implementation of subclassing LogWriter. The general principle is that, it should handle logs above its loglevel (inclusive), and discard logs that are not acceptable. For instance, console loggers obviously can’t handle an image.

episode_count = None

Counter of episodes.

step_count = None

Counter of steps.

active_env_ids = None

Active environment ids in vector env.

__init__(loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>) → None

Initialize self. See help(type(self)) for accurate signature.

global_step = None

Counter of steps. Won”t be cleared in clear.

global_episode = None

Counter of episodes. Won”t be cleared in clear.

episode_lengths = None

Map from environment id to episode length.

episode_rewards = None

Map from environment id to episode total reward.

episode_logs = None

Map from environment id to episode logs.

clear()

Clear all the metrics for a fresh start. To make the logger instance reusable.

state_dict() → dict

Save the states of the logger to a dict.

load_state_dict(state_dict: dict) → None

Load the states of current logger from a dict.

static aggregation(array: Sequence[Any], name: str | None = None) → Any

Aggregation function from step-wise to episode-wise.

If it’s a sequence of float, take the mean. Otherwise, take the first element.

If a name is specified and,

  • if it’s reward, the reduction will be sum.
log_episode(length: int, rewards: List[float], contents: List[Dict[str, Any]]) → None

This is triggered at the end of each trajectory.

Parameters:
  • length – Length of this trajectory.
  • rewards – A list of rewards at each step of this episode.
  • contents – Logged contents for every step.
log_step(reward: float, contents: Dict[str, Any]) → None

This is triggered at each step.

Parameters:
  • reward – Reward for this step.
  • contents – Logged contents for this step.
on_env_step(env_id: int, obs: ObsType, rew: float, done: bool, info: InfoDict) → None

Callback for finite env, on each step.

on_env_reset(env_id: int, _: ObsType) → None

Callback for finite env.

Reset episode statistics. Nothing task-specific is logged here because of a limitation of tianshou.

on_env_all_ready() → None

When all environments are ready to run. Usually, loggers should be reset here.

on_env_all_done() → None

All done. Time for cleanup.

qlib.rl.utils.vectorize_env(env_factory: Callable[..., gym.Env], env_type: FiniteEnvType, concurrency: int, logger: LogWriter | List[LogWriter]) → FiniteVectorEnv

Helper function to create a vector env. Can be used to replace usual VectorEnv.

For example, once you wrote:

DummyVectorEnv([lambda: gym.make(task) for _ in range(env_num)])

Now you can replace it with:

finite_env_factory(lambda: gym.make(task), "dummy", env_num, my_logger)

By doing such replacement, you have two additional features enabled (compared to normal VectorEnv):

  1. The vector env will check for NaN observation and kill the worker when its found. See FiniteVectorEnv for why we need this.
  2. A logger to explicit collect logs from environment workers.
Parameters:
  • env_factory – Callable to instantiate one single gym.Env. All concurrent workers will have the same env_factory.
  • env_type – dummy or subproc or shmem. Corresponding to parallelism in tianshou.
  • concurrency – Concurrent environment workers.
  • logger – Log writers.

Warning

Please do not use lambda expression here for env_factory as it may create incorrectly-shared instances.

Don’t do:

vectorize_env(lambda: EnvWrapper(...), ...)

Please do:

def env_factory(): ...
vectorize_env(env_factory, ...)
class qlib.rl.utils.ConsoleWriter(log_every_n_episode: int = 20, total_episodes: int | None = None, float_format: str = ':.4f', counter_format: str = ':4d', loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>)

Write log messages to console periodically.

It tracks an average meter for each metric, which is the average value since last clear() till now. The display format for each metric is <name> <latest_value> (<average_value>).

Non-single-number metrics are auto skipped.

__init__(log_every_n_episode: int = 20, total_episodes: int | None = None, float_format: str = ':.4f', counter_format: str = ':4d', loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>) → None

Initialize self. See help(type(self)) for accurate signature.

prefix = None

Prefix can be set via writer.prefix.

clear() → None

Clear all the metrics for a fresh start. To make the logger instance reusable.

log_episode(length: int, rewards: List[float], contents: List[Dict[str, Any]]) → None

This is triggered at the end of each trajectory.

Parameters:
  • length – Length of this trajectory.
  • rewards – A list of rewards at each step of this episode.
  • contents – Logged contents for every step.
class qlib.rl.utils.CsvWriter(output_dir: Path, loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>)

Dump all episode metrics to a result.csv.

This is not the correct implementation. It’s only used for first iteration.

__init__(output_dir: Path, loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>) → None

Initialize self. See help(type(self)) for accurate signature.

clear() → None

Clear all the metrics for a fresh start. To make the logger instance reusable.

log_episode(length: int, rewards: List[float], contents: List[Dict[str, Any]]) → None

This is triggered at the end of each trajectory.

Parameters:
  • length – Length of this trajectory.
  • rewards – A list of rewards at each step of this episode.
  • contents – Logged contents for every step.
on_env_all_done() → None

All done. Time for cleanup.

class qlib.rl.utils.EnvWrapperStatus

This is the status data structure used in EnvWrapper. The fields here are in the semantics of RL. For example, obs means the observation fed into policy. action means the raw action returned by policy.

class qlib.rl.utils.LogBuffer(callback: Callable[[bool, bool, LogBuffer], None], loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>)

Keep all numbers in memory.

Objects that can’t be aggregated like strings, tensors, images can’t be stored in the buffer. To persist them, please use PickleWriter.

Every time, Log buffer receives a new metric, the callback is triggered, which is useful when tracking metrics inside a trainer.

Parameters:callback

A callback receiving three arguments:

  • on_episode: Whether it’s called at the end of an episode
  • on_collect: Whether it’s called at the end of a collect
  • log_buffer: the LogBbuffer object

No return value is expected.

__init__(callback: Callable[[bool, bool, LogBuffer], None], loglevel: int | LogLevel = <LogLevel.PERIODIC: 20>)

Initialize self. See help(type(self)) for accurate signature.

state_dict() → dict

Save the states of the logger to a dict.

load_state_dict(state_dict: dict) → None

Load the states of current logger from a dict.

clear()

Clear all the metrics for a fresh start. To make the logger instance reusable.

log_episode(length: int, rewards: list[float], contents: list[dict[str, Any]]) → None

This is triggered at the end of each trajectory.

Parameters:
  • length – Length of this trajectory.
  • rewards – A list of rewards at each step of this episode.
  • contents – Logged contents for every step.
on_env_all_done() → None

All done. Time for cleanup.

episode_metrics() → dict[str, float]

Retrieve the numeric metrics of the latest episode.

collect_metrics() → dict[str, float]

Retrieve the aggregated metrics of the latest collect.