One of the key features of primrose
is that it keeps data in memory. As data is shuffled among nodes for reading, processesing, modeling, and writing, it is readily available for nodes to use without unnecessary serialization and deserialization.
A central object receives new data, stores it, hands it to nodes that request it, and does all the book-keeping under the hood. This is the DataObject
.
DataObject
is a repository, a single instance that is handed to each node (in the sequence determined by the DAGRunner
) to both add new data, but also to consume data, either reading it, transforming it, or popping it off the DataObject
completely.
This concept is so central to primrose
that is plays a pivotal role in the notion of a node and running a node. That is, AbstractNode
has a method:
def run(self, data_object):
"""
run the node. For a reader, that means read, for a writer that means write etc.
Args:
data_object (DataObject): DataObject instance
Returns:
(tuple): tuple containing:
data_object (DataObject): instance of DataObject
terminate (bool): terminate the DAG?
"""
Every node in the DAG, receives the DataObject
, runs (does whatever it needs to do) and returns that same DataObject. It might add new data, it might consume data, or it might just return it as is.
DataObject
is designed to hide some of the decisions and underlying book-keeping. If you want to save some data, you call the add
method:
data_object.add(self, some_object)
That's it! This will save the data some_object
, associating it with the node that did the saving.
some_object
can be anything at all. It might be a data frame, a string, a dictionary, any object that Python can handle.
For instance, a CsvReader
might implement its run
method as:
filename = self.node_config['filename']
logging.info('Reading {} from CSV'.format(filename))
df = pd.read_csv(filename)
data_object.add(self, df)
terminate = df.empty
return data_object, terminate
taking the filename listed in the config, reading into a pandas dataframe, and checking for empty data frame, and return data_object
.
The data is saved with a default key (DataObject.DATA_KEY
). However, you can use your own key:
data_object.add(self, some_object, some_key)
such as
data_object.add(self, df, "tennis_df")
This is important if you wish to save multiple objects:
data_object.add(self, df1, "DF1")
data_object.add(self, df2, "DF2")
although you could always save as a single dictionary:
data_object.add(self, {"DF1": df1, "DF2": df2})
You have options.
Typically, a node will need data that was stored in another node, upstream in the DAG. Thus, the typical action in a node is to save data (data_object.add()
above) and/or to get upstream data. To do the latter, use get_upstream_data
:
data = data_object.get_upstream_data(self.instance_name,
pop_data=False,
rtype=DataObjectResponseType.KEY_VALUE.value)
The first argument is the name of the requestor (typically self.instance_name
). Which node is requesting the data? This is used to determine which nodes are upstream of the focal node.
The pop_data
flag determines whether you want to pop the data off the DataObject
, oin which it will be garbge collected when no longer referenced. This is important as you might not want more and more data accumulating in memory.
The last input to get_upstream_data
is rtype
, short for DataObjectResponseType
, which specifies how you want the data delivered.
The simplest scenario is you just want a single object back. For instance, suppose the only upstream node was a reader that read data into a dataframe. You just want that data frame. In that case, you can specify
rtype=DataObjectResponseType.VALUE.value
For instance,
df = self.get_upstream_data(instance_name,
rtype=DataObjectResponseType.VALUE.value)
will deliver the data to df
.
If, however, there are multiple keys for the upstream node, instread of single data object, you will receive a dictionary of all the data, such {"DF1": df1, "DF2": df2}
.
A more complex scenario is if an upstream node saved multiple objects, such as
data_object.add(self, df1, "DF1")
data_object.add(self, df2, "DF2")
In this case, it is likely easiest to get a single dictionary back (especially if you don't know how many objects were saved and with what keys). For this, specify
rtype=DataObjectResponseType.KEY_VALUE.value
This will return a single dictionary back, which in the case of the above example will be
{"DF1": df1, "DF2": df2}
It will combine the separate, individual keys and objects into a single dictionary for you. To be clear, for this option, specify KEY_VALUE
in get_upstream_data
:
data_dictionary = self.get_upstream_data(instance_name,
rtype=DataObjectResponseType.KEY_VALUE.value)
The last scenario is the most complex. Suppose that multiple upstream nodes feed into a node. For instance, perhaps 3 readers feed into a pipeline node. For this, you will want to get the data but associated with each upstream node; that is, a dictionary of instance_name keys and their data dictionaries. For this, specify
rtype=DataObjectResponseType.INSTANCE_KEY_VALUE.value
This will return a single dictionary but where the keys are the names of the upstream nodes, and the values are dictionaries of the stored data and their associated key:
{'instance_name': {'key': value}, 'instance_name2': {'key2': value2}, ... }
You can also filter data for a given data key. You could request data with KEY_VALUE
or INSTANCE_KEY_VALUE
and inspect the key yourself but you can also use
data_object.get_filtered_upstream_data(self, instance_name, filter_for_key)
where filter_for_key
filters for data keys. For instance, with stored data
{'instance_name': {'key': value}, 'instance_name2': {'key2': value2}, ... }
and calling
data_object.get_filtered_upstream_data(self, instance_name, 'key2')
would return {'key2': value2}
.
If you ask for INSTANCE_KEY_VALUE
but there is a single key, you will still recive a dictionary keyed with instance name ({'instance_name': {'key': value}
).
DataObject
has other methods for inspecting data currently stored in DataObject
such as upstream_keys()
which provides the keys and get()
which gets the data for a given instance name. It also handles local caching of data. However, the add()
and get_upstream_data()
are the methods most often used in nodes.
Learn more about Notifications: Notifications.