Skip to content

Commit

Permalink
DataStorm comments & minor fixes (#3212)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Dec 2, 2024
1 parent b7c2807 commit f2714f7
Show file tree
Hide file tree
Showing 15 changed files with 405 additions and 250 deletions.
38 changes: 33 additions & 5 deletions cpp/src/DataStorm/Contract.ice
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

module DataStormContract
{
/// The ClearHistoryPolicy enumeration defines the policy that determines when a reader clears its
/// DataSample history in response to various events.
/// The ClearHistoryPolicy enumeration defines the policy that determines when a reader clears its DataSample
/// history in response to various events.
enum ClearHistoryPolicy
{
/// The reader clears its history when a new DataSample is added.
Expand Down Expand Up @@ -70,7 +70,7 @@ module DataStormContract
/// and encoded value.
struct ElementInfo
{
/// The unique identifier of the element. Filter IDs are negative.
/// The ID of the element. Filter IDs are negative, while key and tag IDs are positive.
long id;

/// The name of the filter. This field is empty for key and tag elements.
Expand Down Expand Up @@ -131,15 +131,31 @@ module DataStormContract
Ice::ByteSeq criteria;
}

/// Represents the configuration of a reader or writer.
class ElementConfig(1)
{
optional(1) string facet;

/// An optional sample filter associated with the reader. Sample filters are specified on the reader side.
optional(2) FilterInfo sampleFilter;

/// An optional name for the reader or writer.
optional(3) string name;

/// An optional priority for the writer.
/// See also the `DataStorm.Topic.Priority` property.
optional(4) int priority;

/// An optional sample count, specifying the number of samples queued in the writer or reader sample queue.
/// See also the `DataStorm.Topic.SampleCount` property.
optional(10) int sampleCount;

/// An optional lifetime, specified in milliseconds, representing the maximum time samples are kept in the
/// writer or reader sample queue. See also the `DataStorm.Topic.SampleLifetime` property.
optional(11) int sampleLifetime;

/// An optional clear history policy that determines when the reader or writer sample history is cleared.
/// See also the `DataStorm.Topic.ClearHistory` property.
optional(12) ClearHistoryPolicy clearHistory;
}

Expand Down Expand Up @@ -247,15 +263,22 @@ module DataStormContract
void detachTags(long topic, Ice::LongSeq tags);

/// Announces new elements to the peer.
///
/// The peer will invoke `attachElements` for the elements it is interested in. The announced elements include
/// key readers, key writers, and filter readers associated with the specified topic.
/// the readers and writers associated with the specified topic.
///
/// @param topic The ID of the topic associated with the elements.
/// @param elements The sequence of elements to announce.
/// @see attachElements
void announceElements(long topic, ElementInfoSeq elements);

void attachElements(long topic, ElementSpecSeq elements, bool initialize);
/// Attaches the given topic elements to all subscribers of the specified topic.
///
/// @param topicId The ID of the topic to which the elements belong.
/// @param elements The sequence of elements to attach to the topic's subscribers.
/// @param initialize True if called from attachTopic, false otherwise.
void attachElements(long topicId, ElementSpecSeq elements, bool initialize);

void attachElementsAck(long topic, ElementSpecAckSeq elements);
void detachElements(long topic, Ice::LongSeq keys);

Expand All @@ -270,6 +293,11 @@ module DataStormContract

interface SubscriberSession extends Session
{
/// Queue a sample with the subscribers of the topic element.
///
/// @param topicId The ID of the topic.
/// @param elementId The ID of the element.
/// @param sample The sample to queue.
void s(long topicId, long elementId, DataSample sample);
}

Expand Down
74 changes: 51 additions & 23 deletions cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ namespace
{
DataSample toSample(const shared_ptr<Sample>& sample, const Ice::CommunicatorPtr& communicator, bool marshalKey)
{
return {
sample->id,
marshalKey ? 0 : sample->key->getId(),
marshalKey ? sample->key->encode(communicator) : Ice::ByteSeq{},
chrono::time_point_cast<chrono::microseconds>(sample->timestamp).time_since_epoch().count(),
sample->tag ? sample->tag->getId() : 0,
sample->event,
sample->encode(communicator)};
return DataSample{
.id = sample->id,
.keyId = marshalKey ? 0 : sample->key->getId(),
.keyValue = marshalKey ? sample->key->encode(communicator) : Ice::ByteSeq{},
.timestamp = chrono::time_point_cast<chrono::microseconds>(sample->timestamp).time_since_epoch().count(),
.tag = sample->tag ? sample->tag->getId() : 0,
.event = sample->event,
.value = sample->encode(communicator)};
}

void cleanOldSamples(
Expand Down Expand Up @@ -111,8 +111,10 @@ DataElementI::attach(
auto info = *data.config->sampleFilter;
sampleFilter = _parent->getSampleFilterFactories()->decode(getCommunicator(), info.name, info.criteria);
}

string facet = data.config->facet ? *data.config->facet : string();
int priority = data.config->priority ? *data.config->priority : 0;

string name;
if (data.config->name)
{
Expand All @@ -124,14 +126,20 @@ DataElementI::attach(
os << session->getId() << '-' << topicId << '-' << data.id;
name = os.str();
}

if ((id > 0 && attachKey(topicId, data.id, key, sampleFilter, session, prx, facet, id, name, priority)) ||
(id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority)))
{
auto q = data.lastIds.find(_id);
long long lastId = q != data.lastIds.end() ? q->second : 0;
LongLongDict lastIds = key ? session->getLastIds(topicId, id, shared_from_this()) : LongLongDict();
int64_t lastId = q != data.lastIds.end() ? q->second : 0;
LongLongDict lastIds = key ? session->getLastIds(topicId, id, shared_from_this()) : LongLongDict{};
DataSamples samples = getSamples(key, sampleFilter, data.config, lastId, now);
acks.push_back({_id, _config, lastIds, samples.samples, data.id});
acks.push_back(ElementDataAck{
.id = _id,
.config = _config,
.lastIds = std::move(lastIds),
.samples = std::move(samples.samples),
.peerId = data.id});
}
}

Expand All @@ -153,8 +161,10 @@ DataElementI::attach(
auto info = *data.config->sampleFilter;
sampleFilter = _parent->getSampleFilterFactories()->decode(getCommunicator(), info.name, info.criteria);
}

string facet = data.config->facet ? *data.config->facet : string();
int priority = data.config->priority ? *data.config->priority : 0;

string name;
if (data.config->name)
{
Expand All @@ -166,6 +176,7 @@ DataElementI::attach(
os << session->getId() << '-' << topicId << '-' << data.id;
name = os.str();
}

if ((id > 0 && attachKey(topicId, data.id, key, sampleFilter, session, prx, facet, id, name, priority)) ||
(id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority)))
{
Expand All @@ -178,7 +189,7 @@ DataElementI::attach(
session->subscriberInitialized(topicId, id > 0 ? data.id : -data.id, data.samples, key, shared_from_this());
if (!samplesI.empty())
{
return [=, self = shared_from_this()]()
return [=, samplesI = std::move(samplesI), self = shared_from_this()]()
{ self->initSamples(samplesI, topicId, data.id, priority, now, id < 0); };
}
return nullptr;
Expand All @@ -198,14 +209,17 @@ DataElementI::attachKey(
int priority)
{
// No locking necessary, called by the session with the mutex locked
auto p = _listeners.find({session, facet});

ListenerKey listenerKey{session, facet};
auto p = _listeners.find(listenerKey);
if (p == _listeners.end())
{
p = _listeners.emplace(ListenerKey{session, facet}, Listener(std::move(prx), facet)).first;
p = _listeners.emplace(std::move(listenerKey), Listener(std::move(prx), facet)).first;
}

bool added = false;
auto subscriber = p->second.addOrGet(topicId, elementId, keyId, nullptr, sampleFilter, name, priority, added);

if (_onConnectedElements && added)
{
_executor->queue([self = shared_from_this(), name]
Expand Down Expand Up @@ -634,7 +648,7 @@ DataElementI::forward(const Ice::ByteSeq& inParams, const Ice::Current& current)
{
for (const auto& [_, listener] : _listeners)
{
// If there's at least one subscriber interested in the update
// If we are forwarding a sample check if at least once of the listeners is interested in the sample.
if (!_sample || listener.matchOne(_sample, false))
{
// Forward the call using the listener's session proxy don't need to wait for the result.
Expand Down Expand Up @@ -1221,26 +1235,39 @@ KeyDataWriterI::getSamples(
int64_t lastId,
const chrono::time_point<chrono::system_clock>& now)
{
// Collect all queued samples that match the key and sample filter, are newer than the lastId, and are not stale.
DataSamples samples;
samples.id = _keys.empty() ? -_id : _id;

// If the caller sample queueing is disabled, there is no need to return any samples.
if (config->sampleCount && *config->sampleCount == 0)
{
return samples;
}

// Reap stale samples before collecting any samples.
if (_config->sampleLifetime && *_config->sampleLifetime > 0)
{
cleanOldSamples(_samples, now, *_config->sampleLifetime);
}

// Compute the stale time, according to the callers sample lifetime configuration.
chrono::time_point<chrono::system_clock> staleTime = chrono::time_point<chrono::system_clock>::min();
if (config->sampleLifetime && *config->sampleLifetime > 0)
{
staleTime = now - chrono::milliseconds(*config->sampleLifetime);
}

shared_ptr<Sample> first;
// Iterate through samples in reverse chronological order, starting with the newest.
// Stop iterating if any of the following conditions are met:
// - A sample's timestamp is older than the specified stale time.
// - A sample's ID is less than or equal to the specified last ID.
// - The requested number of samples has been collected.
// - A sample event triggers history clearing based on the caller's clear history policy.
// For each sample:
// - Check if it matches the optional key and sample filter.
// - If it matches, add it to the result set and update the first matched sample.
for (auto p = _samples.rbegin(); p != _samples.rend(); ++p)
{
if ((*p)->timestamp < staleTime)
Expand Down Expand Up @@ -1274,19 +1301,20 @@ KeyDataWriterI::getSamples(
}
}
}

if (!samples.samples.empty())
{
// If the first sample is a partial update, transform it to a full Update
if (first->event == DataStorm::SampleEvent::PartialUpdate)
{
samples.samples[0] = {
first->id,
samples.samples[0].keyId,
samples.samples[0].keyValue,
chrono::time_point_cast<chrono::microseconds>(first->timestamp).time_since_epoch().count(),
0,
DataStorm::SampleEvent::Update,
first->encodeValue(getCommunicator())};
samples.samples[0] = DataSample{
.id = first->id,
.keyId = samples.samples[0].keyId,
.keyValue = samples.samples[0].keyValue,
.timestamp = chrono::time_point_cast<chrono::microseconds>(first->timestamp).time_since_epoch().count(),
.tag = 0,
.event = DataStorm::SampleEvent::Update,
.value = first->encodeValue(getCommunicator())};
}
}
return samples;
Expand Down
29 changes: 22 additions & 7 deletions cpp/src/DataStorm/DataElementI.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,29 @@ namespace DataStormI

struct Listener
{
Listener(DataStormContract::SessionPrx proxy, const std::string& facet)
: proxy(facet.empty() ? proxy : proxy->ice_facet<DataStormContract::SessionPrx>(facet))
Listener(DataStormContract::SessionPrx proxy, std::string facet)
: proxy(
facet.empty() ? std::move(proxy)
: proxy->ice_facet<DataStormContract::SessionPrx>(std::move(facet)))
{
}

/**
* Determines if any subscriber matches the given sample.
*
* @param sample The sample to evaluate against the subscribers.
* @param matchKey If true, the sample's key is matched against subscriber keys.
* If false, the key match is skipped.
* @return True if at least one subscriber matches the sample, otherwise false.
*/
bool matchOne(const std::shared_ptr<Sample>& sample, bool matchKey) const
{
for (const auto& s : subscribers)
for (const auto& [_, subscriber] : subscribers)
{
if ((!matchKey || s.second->keys.empty() ||
s.second->keys.find(sample->key) != s.second->keys.end()) &&
(!s.second->filter || s.second->filter->match(sample->key)) &&
(!s.second->sampleFilter || s.second->sampleFilter->match(sample)))
if ((!matchKey || subscriber->keys.empty() ||
subscriber->keys.find(sample->key) != subscriber->keys.end()) &&
(!subscriber->filter || subscriber->filter->match(sample->key)) &&
(!subscriber->sampleFilter || subscriber->sampleFilter->match(sample)))
{
return true;
}
Expand Down Expand Up @@ -129,7 +139,9 @@ namespace DataStormI
return subscribers.empty();
}

// The proxy to the peer session.
DataStormContract::SessionPrx proxy;
// A map containing the data element subscribers, indexed by the topic ID and the element ID.
std::map<std::pair<std::int64_t, std::int64_t>, std::shared_ptr<Subscriber>> subscribers;
};

Expand Down Expand Up @@ -267,6 +279,9 @@ namespace DataStormI
mutable std::shared_ptr<Sample> _sample;
DataStormContract::SessionPrx _forwarder;
std::map<std::shared_ptr<Key>, std::vector<std::shared_ptr<Subscriber>>> _connectedKeys;

// A map containing the element listeners, indexed by the session servant and the target facet. The
// implementation of forward utilizes the listener map to forward calls to the peer sessions.
std::map<ListenerKey, Listener> _listeners;

private:
Expand Down
19 changes: 9 additions & 10 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ using namespace DataStormContract;

namespace
{
// TODO convert to a middleware
class DispatchInterceptorI : public Ice::Object
class SessionDispatcher : public Ice::Object
{
public:
DispatchInterceptorI(shared_ptr<NodeI> node, shared_ptr<CallbackExecutor> executor)
SessionDispatcher(shared_ptr<NodeI> node, shared_ptr<CallbackExecutor> executor)
: _node(std::move(node)),
_executor(std::move(executor))
{
Expand All @@ -48,15 +47,15 @@ namespace

NodeI::NodeI(const shared_ptr<Instance>& instance)
: _instance(instance),
_nextPublisherSessionId{0},
_nextSubscriberSessionId{0},
_proxy{instance->getObjectAdapter()->createProxy<NodePrx>({Ice::generateUUID(), ""})},
// The subscriber and publisher collocated forwarders are initalized here to avoid using a nullable proxy. These
// objects are only used after the node is initialized and are removed in destroy implementation.
_subscriberForwarder{instance->getCollocatedForwarder()->add<SubscriberSessionPrx>(
[this](Ice::ByteSeq inParams, const Ice::Current& current) { forwardToSubscribers(inParams, current); })},
_publisherForwarder{instance->getCollocatedForwarder()->add<PublisherSessionPrx>(
[this](Ice::ByteSeq inParams, const Ice::Current& current) { forwardToPublishers(inParams, current); })},
_nextSubscriberSessionId{0},
_nextPublisherSessionId{0}
_subscriberForwarder{instance->getCollocatedForwarder()->add<SubscriberSessionPrx>(
[this](Ice::ByteSeq inParams, const Ice::Current& current) { forwardToSubscribers(inParams, current); })}
{
}

Expand All @@ -77,7 +76,9 @@ NodeI::init()
auto adapter = instance->getObjectAdapter();
adapter->add<NodePrx>(self, _proxy->ice_getIdentity());

auto interceptor = make_shared<DispatchInterceptorI>(self, instance->getCallbackExecutor());
// Register the SessionDispatcher object as the default servant for subscriber and publisher sessions.
// The "s" category handles subscriber sessions, and the "p" category handles publisher sessions.
auto interceptor = make_shared<SessionDispatcher>(self, instance->getCallbackExecutor());
adapter->addDefaultServant(interceptor, "s");
adapter->addDefaultServant(interceptor, "p");
}
Expand All @@ -97,9 +98,7 @@ NodeI::destroy(bool ownsCommunicator)

if (!ownsCommunicator)
{
//
// Notifies peer sessions of the disconnection.
//
for (const auto& [_, subscriber] : _subscribers)
{
if (auto session = subscriber->getSession())
Expand Down
Loading

0 comments on commit f2714f7

Please sign in to comment.