rproc
- Realtime Processing component.
Component is located in the ec/components/rproc
.
- performing calculation on live data streams
- subscribing to the realtime
source data stream
(i.e.tickHF
) and calculatesderived data stream
rproc
component alone does not provide any useful functionality - it is just acontainer
rproc
requires additional code calledplugin
which defines the logic for calculation ofderived data
rproc
package consist of some sample predefined plugins (snap
,mrvs
,ohlc
) which show how a plugin code can be defined. Those plugins are described in more detail below.
Note: configure port and component name according to your conventions (core.hdbWriter)
Example of system.cfg
configuration entry for the rproc
component with predefined mrvs
plug-in (component is named t.mrvs
).
[[t.mrvs]]
command = "q rproc.q"
type = q:rproc/rproc
port = ${basePort} + 6
memCap = 10000
requires = t.hdb, t.tick
libs = mrvs
cfg.serverAux = t.hdb
-
libs
field should point to the plugin file.
Example config points to themrvs
, which results in loadingmrvs.q
plugin. -
It is recommended to set
requires
field to the servers used during plugin initialisation. This filed is specifying the order ofyak start
command, -
Any auxiliary servers required for e.g. initialization should be listed under
cfg.serverAux
field.
Example config points to thet.hdb
, asmrvs
plugin is usingt.hdb
server to initialize its state.
cfg.serverAux
is optional, by default it is set toNULL
[[t.mrvs]]
subSrc = t.tick
-
subSrc
field should point to thesource data stream
.
Example config points to thet.tick
. -
Subsection
[[t.mrvs]]
should be added to each table should be processed by thet.mrvs
process.
Derived data
can be kept in memory for ad-hoc queries, it can be also further published with publish-subscribe method to the clients.
Derived data
calculation logic is defined by a set of plug-in functions.
plugin
can publish the data using .u.pub[tab;data]
function from qsl/u
library.
In that case clients can subscribe to rproc
component and retrieve the derived data
updates.
There is no facility for the journaling of the published derived data
.
Subscriber can consume the state of the in-memory table kept in the rproc
component (depending on the actual plugin
definition),
but there is no journal that could be replayed as in the data published through the standard tickHF
.
The following functions are used to define a plugin
.
All of those functions have default empty
implementation.
Custom code should overwrite some or all of them, depending on the use case.
Plug-in .rp.plug.init[srv]
is invoked during component initialization.
The role of this callback is to initialize the data model and optionally insert start-up content
for each derived table
in the rproc
component.
It is invoked after
opening of the connection to the cfg.serverAux
servers.
Plug-in .rp.plug.upd[tab;data]
is invoked on each data receive from tickHF
process.
The role of this callback is to calculate derived data
based on the source data update
.
This callback is the essential element of the implementation as it actually defines the logic for data processing.
Plug-in .rp.plug.end[day]
is invoked on at end of day (triggered by the tickHF
).
May be used for day wrap-up actions, e.g. memory clearing.
If component is using tickLF data source, it could also overwrite tickLF callbacks
:
.tickLF.upd[]
/.tickLF.jUpd[]
.tickLF.ups[]
/.tickLF.jUps[]
.tickLF.img[]
/.tickLF.jImg[]
.tickLF.del[]
/.tickLF.jDel[]
By default those are loaded from ec/libraries/qsl/sub_tickLF.q
,
Default callbacks are also available in memory in .sub.tickLF.default
global variable.
Note that using tickLF protocol for stream calculation can lead to significant increase of the custom logic complexity, as the plugin should handle correctly all types of actions including upserts and deletes.
The following global variables are available for the plugin definition.
.rp.cfg.model
is a dictionary where the key
is a table name and the value
is its data model.
It contains data model for each table in the dataflow.cfg
file which has current component configured.
.rp.cfg.srcTabs
contains a list of the tables which are actually the source tables
for the rproc
component (those contain subSrc
field in the dataflow.cfg
)
Component does not provide any functionality for preserving its intermediate state.
At startup it must fully initialize based on the source server (tickHF
) journal
and optionally based on the auxiliary servers (e.g. hdb
).
The following use cases are implemented using the rproc
component to show its functionality.
Ready-to-use configuration sample can be found in ec/components/rproc/test/etc/
.
Instructions to start this mini system can be found in ec/components/rproc/test/README.md
snap
implements the following functionality:
- calculation of 1-minute snapshots for each symbol
- generic snapshots calculation works for any table with
time
andsym
columns - snapshots are not published but kept in memory for ad-hoc queries
- snapshots calculated for
today
only, reset at the end of the day
[[t.snap]]
command = "q rproc.q"
type = q:rproc/rproc
port = ${basePort} + 7
memCap = 10000
requires = t.hdb, t.tick
libs = snap
[[t.snap]]
subSrc = t.tick
initialization of the derived data model
based on the source data model
located in .rp.cfg.model
dictionary.
.rp.cfg.srcTabs
contains a list of the tables which are subscribed.
.rp.plug.init:{[srv];
:{[tab] tab set update`g#sym from select by time.minute, sym from .rp.cfg.model[tab]}each .rp.cfg.srcTabs;
};
Each in-memory table is upserted
with the latest update from tickHF
process.
.rp.plug.upd:{[tab;data]
tab upsert select by time.minute, sym from data
};
At the end of the day
data is deleted from memory.
.rp.plug.end:{[day]
{update`g#sym from delete from x}each .rp.cfg.srcTabs;
};
mrvs
implements the following functionality:
- maintaining the most recent record for each symbol
- generic snapshots calculation works for any table with
sym
columns - most recent values are not published but kept in memory for ad-hoc queries
- process can be initialized with the yesterday's data from the historical database
hdb
[[t.mrvs]]
command = "q rproc.q"
type = q:rproc/rproc
port = ${basePort} + 6
memCap = 10000
requires = t.hdb, t.tick
libs = mrvs
cfg.serverAux = t.hdb
[[t.mrvs]]
subSrc = t.tick
There are two options for mrvs
initialization:
- initialization of the
derived data model
based on thesource data model
located in.rp.cfg.model
dictionary. - initialization from the
hdb
process. Activated whencfg.serverAux
is set in the configuration. In this casemrvs
process will start with the most recent values fromyesterday
.
.rp.plug.init:{[srv];
if[srv~();
:{[hdb;tab]tab set update`u#sym from .hnd.h[hdb](.mrvs.hdb.lastBySym;tab;.z.d-1)}[srv]each .rp.cfg.srcTabs;
];
:{[tab] tab set update`u#sym from select by sym from .rp.cfg.model[tab]}each .rp.cfg.srcTabs;
};
Each in-memory table is upserted
with the latest update from tickHF
process. as the action is simple upsert
it can be defined as following:
.rp.plug.upd:upsert;
No eod action required for the mrvs
, current values are initializing the next day.
ohlc
implements the following functionality:
- calculating
open-high-low-close
based on thetrade
table ohlc
records are kept in memory for ad-hoc queries and published to the subscribers.- users can subscribes for
ohlc
table using classical kx subscription protocol - see.u.sub[]
function andec/libraries/qsl/u.q
publishing library.
[[t.ohlc]]
command = "q rproc.q"
type = q:rproc/rproc
port = ${basePort} + 1
memCap = 10000
requires = t.hdb, t.tick
libs = ohlc
cfg.serverAux = t.hdb
[[t.ohlc]]
subSrc = t.tick
Additionally ohlc
table model is defined as following:
[table:ohlc]
model = sym(SYMBOL), open(FLOAT), high(FLOAT), low(FLOAT), close(FLOAT), volume(LONG)
[[t.rproc]]
Initialization of the ohlc
table data model is based on the configuration entry
[table:ohlc]
model = sym(SYMBOL), open(FLOAT), high(FLOAT), low(FLOAT), close(FLOAT), volume(LONG)
[[t.ohlc]]
ohlc
is a keyed table with the key
on sym
column. For each instrument that was processed we will have exactly one record.
.rp.plug.init:{[servers];
ohlc::`sym xcol .rp.cfg.model[`ohlc];
};
In-memory ohlc
table is updated with the latest trade
update from tickHF
process.
In a second step the records of ohlc
table which were affected by the change are being published with the .u.pub[tabName;data]
function.
.rp.plug.upd:{[tab;data]
ohlc::select first open, max high,min low,last close,sum volume by sym from(0!ohlc),select sym,open:price,high:price,low:price,close:price,volume:size from data;
syms:exec distinct sym from data;
.u.pub[`ohlc;0!select from ohlc where sym in syms];
};
No eod action is required for the ohlc
.