-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DataStorm comments and bug fixes #3231
DataStorm comments and bug fixes #3231
Conversation
@@ -163,8 +168,6 @@ void | |||
NodeSessionI::destroy() | |||
{ | |||
lock_guard<mutex> lock(_mutex); | |||
_destroyed = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused
@@ -197,5 +200,5 @@ void | |||
NodeSessionI::addSession(SessionPrx session) | |||
{ | |||
lock_guard<mutex> lock(_mutex); | |||
_sessions[session->ice_getIdentity()] = std::move(session); | |||
_sessions.insert_or_assign(session->ice_getIdentity(), session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced []
operator to allow using non-nullable proxies in the std::map
@@ -191,7 +190,7 @@ NodeSessionManager::announceTopicWriter(const string& topic, NodePrx node, const | |||
} | |||
|
|||
auto p = _sessions.find(node->ice_getIdentity()); | |||
auto nodePrx = p != _sessions.end() ? p->second->getPublicNode() : node; | |||
node = p != _sessions.end() ? p->second->getPublicNode() : node; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a bug here, we were not using the updated proxy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
cpp/src/DataStorm/Contract.ice
Outdated
/// Announces existing topics to the peer during session establishment. | ||
/// Announces new and existing topics to the peer. | ||
/// | ||
/// - During session establishment, this method announces existing topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the term 'operation' for Slice.
cpp/src/DataStorm/NodeSessionI.h
Outdated
DataStormContract::NodePrx _node; | ||
|
||
// The connection used to create the session. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I will clarify it.
const Ice::ConnectionPtr _connection; | ||
|
||
std::mutex _mutex; | ||
bool _destroyed; | ||
// A proxy to the target node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From this description, it's not clear when it can be nullopt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you refer to _publicNode
because this one is non optional.
For _publicNode is only nullopt before init is called, because the self reference required to create the NodeForwarder servant.
std::int64_t _nextReaderId; | ||
std::int64_t _nextWriterId; | ||
|
||
// A map of topic readers indexed by the topic name. | ||
// Each key is a topic name, and the corresponding value is a vector of readers for that topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's odd that shared_ptr<TopicI>
represents a reader (resp. writer) for a Topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have:
class TopicReaderI final : public TopicReader, public TopicI
class TopicWriterI final : public TopicWriter, public TopicI
Then for a topic instance like Topic<string, string> topic(node, "hello")
A topic reader object is created when the first key reader is created, and a topic writer is created when the first key writer is created with the given topic instance.
The topic reader and topic writer are shared by all key readers and key writers created using the same optic instance.
template<typename Key, typename Value, typename UpdateTag = std::string> class Topic
{
...
private:
mutable std::shared_ptr<DataStormI::TopicReader> _reader;
mutable std::shared_ptr<DataStormI::TopicWriter> _writer;
}
@@ -77,7 +76,12 @@ namespace DataStormI | |||
// the specified node and sets this member once the connection is established. | |||
std::optional<DataStormContract::LookupPrx> _connectedTo; | |||
|
|||
// Stores the connection currently sending an announcement. This is used to exclude the node session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is always non null when forwarding calls in NodeSessionManager::forward
which is the only place where it is read.
@@ -30,6 +29,7 @@ TopicFactoryI::createTopicReader( | |||
shared_ptr<FilterManager> sampleFilterFactories) | |||
{ | |||
shared_ptr<TopicReaderI> reader; | |||
auto instance = getInstance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not directly from this PR.
I am not a fan of this getInstance function. It would be more readable to expand it inline, as in:
auto instance = _instance.lock();
assert(instance); // comment explaining why it can't be null here
Co-authored-by: Joe George <[email protected]>
This PR adds more comments to DataStorm implementation. It includes some updates to the port from the original datastorm implementation.