Skip to content

Commit

Permalink
DataStorm doc & impl comments (#3204)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Nov 27, 2024
1 parent 1d22999 commit f5f8aa9
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 72 deletions.
47 changes: 27 additions & 20 deletions cpp/include/DataStorm/DataStorm.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,17 @@ namespace DataStorm
bool hasWriters() const noexcept;

/**
* Wait for given number of writers to be online. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for given number of writers to be online.
*
* @param count The number of writers to wait.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForWriters(unsigned int count = 1) const;

/**
* Wait for readers to be offline. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for writers to be offline.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForNoWriters() const;

Expand All @@ -266,8 +267,9 @@ namespace DataStorm
std::vector<Sample<Key, Value, UpdateTag>> getAllUnread() noexcept;

/**
* Wait for given number of unread samples to be available. The node shutdown will cause this method to
* raise NodeShutdownException.
* Wait for given number of unread samples to be available.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForUnread(unsigned int count = 1) const;

Expand All @@ -279,10 +281,10 @@ namespace DataStorm
bool hasUnread() const noexcept;

/**
* Returns the next unread sample. The node shutdown will cause this method to raise
* NodeShutdownException.
* Returns the next unread sample.
*
* @return The unread sample.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
Sample<Key, Value, UpdateTag> getNextUnread();

Expand Down Expand Up @@ -384,15 +386,17 @@ namespace DataStorm
bool hasReaders() const noexcept;

/**
* Wait for given number of readers to be online. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for given number of readers to be online.
*
* @param count The number of readers to wait.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForReaders(unsigned int count = 1) const;

/**
* Wait for readers to be offline. The node shutdown this method to raise NodeShutdownException.
* Wait for readers to be offline.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForNoReaders() const;

Expand All @@ -411,9 +415,10 @@ namespace DataStorm
std::vector<Key> getConnectedKeys() const noexcept;

/**
* Get the last written sample. If there's no sample, the std::logic_error exception is raised.
* Get the last written sample.
*
* @return The last written sample.
* @throws std::logic_error If there's no sample.
**/
Sample<Key, Value, UpdateTag> getLast();

Expand Down Expand Up @@ -549,16 +554,17 @@ namespace DataStorm
bool hasWriters() const noexcept;

/**
* Wait for given number of data writers to be online. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for given number of data writers to be online.
*
* @param count The number of date writers to wait.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForWriters(unsigned int count = 1) const;

/**
* Wait for data writers to be offline. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for data writers to be offline.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForNoWriters() const;

Expand All @@ -577,16 +583,17 @@ namespace DataStorm
bool hasReaders() const noexcept;

/**
* Wait for given number of data readers to be online. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for given number of data readers to be online.
*
* @param count The number of data readers to wait.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForReaders(unsigned int count = 1) const;

/**
* Wait for data readers to be offline. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for data readers to be offline.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForNoReaders() const;

Expand Down
7 changes: 2 additions & 5 deletions cpp/include/DataStorm/InternalT.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,8 @@ namespace DataStormI
return k;
}

//
// The key is being removed concurrently by the deleter, remove it now
// to allow the insertion of a new key. The deleter won't remove the
// new key.
//
// The key is being removed concurrently by the deleter, remove it now to allow the insertion of a new
// key. The deleter won't remove the new key.
_elements.erase(p);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/include/DataStorm/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ namespace DataStorm
* The Cloner template provides a method to clone user types.
*
* The cloner template can be specialized to provide cloning for types that require special cloning. By
* defaut, the template uses plain C++ copy.
* default, the template uses plain C++ copy.
*
* @headerfile DataStorm/DataStorm.h
*/
Expand Down
71 changes: 56 additions & 15 deletions cpp/src/DataStorm/Contract.ice
Original file line number Diff line number Diff line change
Expand Up @@ -66,38 +66,59 @@ module DataStormContract
}
sequence<DataSamples> DataSamplesSeq;

/// Provides information about an element, which can be a key, a filter, or a tag. Includes the element's ID, name,
/// and encoded value.
struct ElementInfo
{
/// The key or filter id.
/// The unique identifier of the element. Filter IDs are negative.
long id;

/// The filter name.
/// The name of the filter. This field is empty for key and tag elements.
string name;

/// The key or filter value.
/// The encoded value of the element.
Ice::ByteSeq value;
}
sequence<ElementInfo> ElementInfoSeq;

/// Provides information about a topic, including its name and the list of active topic reader or topic writer IDs.
///
/// There is a unique `TopicInfo` for all topic instances with the same name, representing a single logical topic.
/// Each instance has its own topic reader and topic writer, which are lazily initialized and have a unique ID.
///
/// @see Session#announceTopics
struct TopicInfo
{
/// The topic name.
/// The name of the topic.
string name;

/// The id of topic writers or readers.
/// The list of active topic reader or topic writer IDs for the topic.
///
/// - In a publisher session announcing topics to a subscriber session, this contains the active topic writer
/// IDs.
/// - In a subscriber session announcing topics to a publisher session, this contains the active topic reader
/// IDs.
Ice::LongSeq ids;
}

/// Represents a sequence of active topics used for transmitting topic information during session establishment.
///
/// @see Session#announceTopics
sequence<TopicInfo> TopicInfoSeq;

/// Provides detailed information about topic readers and topic writers, including its ID, name, keys, filters,
/// and tags.
///
/// @see Session#attachTopic
struct TopicSpec
{
/// The id of the topic.
/// The ID of the topic.
long id;

/// The name of the topic.
string name;

/// The topic keys or filters.
/// The topic's keys and filters.
ElementInfoSeq elements;

/// The topic update tags.
Expand Down Expand Up @@ -135,6 +156,7 @@ module DataStormContract
}
sequence<ElementData> ElementDataSeq;

/// Provides detailed information about elements that can be either a key or a filter.
struct ElementSpec
{
/// The readers and writers associated with the key or filter.
Expand All @@ -143,7 +165,7 @@ module DataStormContract
/// The id of the key or filter.
long id;

/// The name of the filter.
/// The name of the filter. This field is empty for key elements.
string name;

/// The value of the key or filter.
Expand Down Expand Up @@ -200,20 +222,39 @@ module DataStormContract

interface Session
{
/// Called by sessions to announce topics to the peer. A publisher session announces the topics it writes,
/// while a subscriber session announces the topics it reads.
/// Announces existing topics to the peer during session establishment.
/// A publisher session announces the topics it writes, while a subscriber session announces the topics it reads.
///
/// The peer receiving the announcement will invoke `attachTopic` for the topics it is interested in.
///
/// @param topics The topics to announce.
/// @param initialize currently unused.
/// @param topics The sequence of topics to announce.
/// @param initialize Currently unused.
/// @see attachTopic
void announceTopics(TopicInfoSeq topics, bool initialize);

/// Attaches a local topic to a remote topic when a session receives a topic announcement from a peer.
///
/// This method is called if the session is interested in the announced topic, which occurs when:
/// - The session has a reader for a topic that the peer has a writer for, or
/// - The session has a writer for a topic that the peer has a reader for.
///
/// @param topic The TopicSpec object describing the topic being attached to the remote topic.
void attachTopic(TopicSpec topic);

void detachTopic(long topic);

void attachTags(long topic, ElementInfoSeq tags, bool initialize);
void detachTags(long topic, Ice::LongSeq tags);

void announceElements(long topic, ElementInfoSeq keys);
/// Announces new elements to the peer.
/// The peer will invoke `attachElements` for the elements it is interested in. The announced elements include
/// key readers, key writers, and filter readers associated with the specified topic.
///
/// @param topic The ID of the topic associated with the elements.
/// @param elements The sequence of elements to announce.
/// @see attachElements
void announceElements(long topic, ElementInfoSeq elements);

void attachElements(long topic, ElementSpecSeq elements, bool initialize);
void attachElementsAck(long topic, ElementSpecAckSeq elements);
void detachElements(long topic, Ice::LongSeq keys);
Expand Down Expand Up @@ -274,8 +315,8 @@ module DataStormContract
/// Announce a topic reader.
///
/// @param topic The name of the topic.
/// @param node The node reading the topic. The proxy is never null.
idempotent void announceTopicReader(string topic, Node* node);
/// @param subscriber The node reading the topic. The subscriber proxy is never null.
idempotent void announceTopicReader(string topic, Node* subscriber);

/// Announce a topic writer.
///
Expand Down
10 changes: 2 additions & 8 deletions cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,10 +1061,7 @@ KeyDataReaderI::KeyDataReaderI(
out << this << ": created key reader";
}

//
// If sample filtering is enabled, ensure the updates are received using a session
// facet specific to this reader.
//
// If sample filtering is enabled, ensure the updates are received using a session facet specific to this reader.
if (_config->sampleFilter)
{
ostringstream os;
Expand Down Expand Up @@ -1343,10 +1340,7 @@ FilteredDataReaderI::FilteredDataReaderI(
out << this << ": created filtered reader";
}

//
// If sample filtering is enabled, ensure the updates are received using a session
// facet specific to this reader.
//
// If sample filtering is enabled, ensure the updates are received using a session facet specific to this reader.
if (_config->sampleFilter)
{
ostringstream os;
Expand Down
29 changes: 21 additions & 8 deletions cpp/src/DataStorm/LookupI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,27 @@ LookupI::LookupI(
}

void
LookupI::announceTopicReader(string name, optional<NodePrx> proxy, const Ice::Current& current)
LookupI::announceTopicReader(string name, optional<NodePrx> subscriber, const Ice::Current& current)
{
Ice::checkNotNull(proxy, __FILE__, __LINE__, current);
_nodeSessionManager->announceTopicReader(name, *proxy, current.con);
_topicFactory->createSubscriberSession(name, *proxy, current.con);
Ice::checkNotNull(subscriber, __FILE__, __LINE__, current);
// Forward the announcement to known nodes via the node session manager.
_nodeSessionManager->announceTopicReader(name, *subscriber, current.con);

// Notify the topic factory about the new topic reader.
// If there are any writers for the topic, the factory will create a subscriber session for it.
_topicFactory->createSubscriberSession(name, *subscriber, current.con);
}

void
LookupI::announceTopicWriter(string name, optional<NodePrx> proxy, const Ice::Current& current)
LookupI::announceTopicWriter(string name, optional<NodePrx> publisher, const Ice::Current& current)
{
Ice::checkNotNull(proxy, __FILE__, __LINE__, current);
_nodeSessionManager->announceTopicWriter(name, *proxy, current.con);
_topicFactory->createPublisherSession(name, *proxy, current.con);
Ice::checkNotNull(publisher, __FILE__, __LINE__, current);
// Forward the announcement to known nodes via the node session manager.
_nodeSessionManager->announceTopicWriter(name, *publisher, current.con);

// Notify the topic factory about the new topic writer.
// If there are any readers for the topic, the factory will create a publisher session for it.
_topicFactory->createPublisherSession(name, *publisher, current.con);
}

void
Expand All @@ -45,8 +53,13 @@ LookupI::announceTopics(
const Ice::Current& current)
{
Ice::checkNotNull(proxy, __FILE__, __LINE__, current);
// Forward the announcement to known nodes via the node session manager.
_nodeSessionManager->announceTopics(readers, writers, *proxy, current.con);

// Notify the topic factory about the new topic readers and writers.
// The factory will create subscriber sessions for topics with matching writers and publisher sessions for topics
// with matching readers.

for (const auto& name : readers)
{
_topicFactory->createSubscriberSession(name, *proxy, current.con);
Expand Down
Loading

0 comments on commit f5f8aa9

Please sign in to comment.