Skip to content

Commit

Permalink
DataStorm comments and bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone committed Dec 4, 2024
1 parent e657245 commit 39188dd
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 47 deletions.
9 changes: 7 additions & 2 deletions cpp/src/DataStorm/Contract.ice
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,14 @@ module DataStormContract

interface Session
{
/// Announces existing topics to the peer during session establishment.
/// Announces new and existing topics to the peer.
///
/// - During session establishment, this method announces existing topics.
/// - For already established sessions, it is used to announce new topics.
///
/// A publisher session announces the topics it writes, while a subscriber session announces the topics it reads.
///
/// The peer receiving the announcement will invoke `attachTopic` for the topics it is interested in.
/// The peer receiving the announcement will invoke `attachTopic` for any topics it is interested in.
///
/// @param topics The sequence of topics to announce.
/// @param initialize Currently unused.
Expand All @@ -251,6 +255,7 @@ module DataStormContract
/// Attaches a local topic to a remote topic when a session receives a topic announcement from a peer.
///
/// This method is called if the session is interested in the announced topic, which occurs when:
///
/// - The session has a reader for a topic that the peer has a writer for, or
/// - The session has a writer for a topic that the peer has a reader for.
///
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/DataStorm/NodeSessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ using namespace DataStormContract;

namespace
{
// The NodeForwarder class is used to forward calls to a Node that doesn't have a public endpoint.
// It implements the Slice DataContract::Node interface by forwarding calls to the target Node object.
// The `NodeForwarder` class forwards calls to a `Node` that lacks a public endpoint.
//
// This class implements the Slice `DataContract::Node` interface by forwarding calls to the target `Node` object
// using the connection established during the creation of the `NodeSession` object.
//
// The `NodeForwarder` wraps the node and session proxy parameters passed to the `DataContract::Node` operations
// in forwarder proxies, which handle forwarding to the corresponding target objects.
class NodeForwarder : public Node, public enable_shared_from_this<NodeForwarder>
{
public:
Expand Down Expand Up @@ -163,8 +168,6 @@ void
NodeSessionI::destroy()
{
lock_guard<mutex> lock(_mutex);
_destroyed = true;

try
{
if (_publicNode != _node)
Expand Down Expand Up @@ -197,5 +200,5 @@ void
NodeSessionI::addSession(SessionPrx session)
{
lock_guard<mutex> lock(_mutex);
_sessions[session->ice_getIdentity()] = std::move(session);
_sessions.insert_or_assign(session->ice_getIdentity(), session);
}
14 changes: 11 additions & 3 deletions cpp/src/DataStorm/NodeSessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,27 @@ namespace DataStormI

private:
const std::shared_ptr<Instance> _instance;
// A proxy to the target node, representing the node that created the session.
DataStormContract::NodePrx _node;

// The connection used to create the session.
const Ice::ConnectionPtr _connection;

std::mutex _mutex;
bool _destroyed;
// A proxy to the target node.
//
// - If the target node has a public endpoint or an adapter ID, this proxy is identical to the `_node` proxy.
// - Otherwise, it is a proxy to a `NodeForwarder` object, which forwards calls to the target node over the
// incoming connection used to create the session.
std::optional<DataStormContract::NodePrx> _publicNode;

// A proxy for forwarding announcements to the target node when announce forwarding is enabled.
// If announce forwarding is disabled, this will be nullopt.
std::optional<DataStormContract::LookupPrx> _lookup;

// A map containing all the publisher and subscriber sessions established between two nodes.
std::map<Ice::Identity, std::optional<DataStormContract::SessionPrx>> _sessions;
// A map containing all publisher and subscriber sessions established with the session's target node via a
// node forwarder.
std::map<Ice::Identity, DataStormContract::SessionPrx> _sessions;
};
}
#endif
19 changes: 9 additions & 10 deletions cpp/src/DataStorm/NodeSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include "NodeSessionManager.h"
#include "ConnectionManager.h"
#include "ForwarderManager.h"
#include "Instance.h"
#include "NodeI.h"
#include "NodeSessionI.h"
Expand Down Expand Up @@ -33,7 +32,7 @@ namespace
if (auto session = _nodeSessionManager->getSession(Ice::Identity{current.id.name.substr(pos + 1), ""}))
{
// Forward the call to the target session, don't need to wait for the result.
auto id = Ice::Identity{current.id.name.substr(0, pos), current.id.category.substr(0, 1)};
Ice::Identity id{current.id.name.substr(0, pos), current.id.category.substr(0, 1)};
session->getConnection()->createProxy(id)->ice_invokeAsync(
current.operation,
current.mode,
Expand Down Expand Up @@ -145,13 +144,13 @@ NodeSessionManager::announceTopicReader(const string& topic, NodePrx node, const
}

auto p = _sessions.find(node->ice_getIdentity());
auto nodePrx = p != _sessions.end() ? p->second->getPublicNode() : node;
node = p != _sessions.end() ? p->second->getPublicNode() : node;

// Set the exclude connection to prevent forwarding the announcement back to the sender.
_exclude = connection;
// Forward the announcement to all known nodes, including nodes with an active session and those we are connected
// to. This is a collocated, synchronous call.
_forwarder->announceTopicReader(topic, nodePrx);
_forwarder->announceTopicReader(topic, node);

lock.unlock();

Expand All @@ -163,7 +162,7 @@ NodeSessionManager::announceTopicReader(const string& topic, NodePrx node, const
auto instance = _instance.lock();
if (instance && instance->getLookup())
{
instance->getLookup()->announceTopicReaderAsync(topic, nodePrx, nullptr);
instance->getLookup()->announceTopicReaderAsync(topic, node, nullptr);
}
}
}
Expand Down Expand Up @@ -191,7 +190,7 @@ NodeSessionManager::announceTopicWriter(const string& topic, NodePrx node, const
}

auto p = _sessions.find(node->ice_getIdentity());
auto nodePrx = p != _sessions.end() ? p->second->getPublicNode() : node;
node = p != _sessions.end() ? p->second->getPublicNode() : node;

// Set the exclude connection to prevent forwarding the announcement back to the sender.
_exclude = connection;
Expand Down Expand Up @@ -255,13 +254,13 @@ NodeSessionManager::announceTopics(
}

auto p = _sessions.find(node->ice_getIdentity());
auto nodePrx = p != _sessions.end() ? p->second->getPublicNode() : node;
node = p != _sessions.end() ? p->second->getPublicNode() : node;

// Set the exclude connection to prevent forwarding the announcement back to the sender.
_exclude = connection;
// Forward the announcement to all known nodes, including nodes with an active session and those we are connected
// to. This is a collocated, synchronous call.
_forwarder->announceTopics(readers, writers, nodePrx);
_forwarder->announceTopics(readers, writers, node);

lock.unlock();

Expand All @@ -273,7 +272,7 @@ NodeSessionManager::announceTopics(
auto instance = _instance.lock();
if (instance && instance->getLookup())
{
instance->getLookup()->announceTopicsAsync(readers, writers, nodePrx, nullptr);
instance->getLookup()->announceTopicsAsync(readers, writers, node, nullptr);
}
}
}
Expand Down Expand Up @@ -441,7 +440,7 @@ NodeSessionManager::disconnected(LookupPrx lookup)
}

void
NodeSessionManager::destroySession(optional<NodePrx> node)
NodeSessionManager::destroySession(NodePrx node)
{
unique_lock<mutex> lock(_mutex);
auto p = _sessions.find(node->ice_getIdentity());
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/DataStorm/NodeSessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#ifndef DATASTORM_NODE_SESSION_MANAGER_H
#define DATASTORM_NODE_SESSION_MANAGER_H

#include "DataStorm/Config.h"
#include "DataStorm/Contract.h"
#include "Ice/Ice.h"

Expand Down Expand Up @@ -50,7 +49,7 @@ namespace DataStormI
void disconnected(DataStormContract::NodePrx, DataStormContract::LookupPrx);
void disconnected(DataStormContract::LookupPrx);

void destroySession(std::optional<DataStormContract::NodePrx>);
void destroySession(DataStormContract::NodePrx);

std::shared_ptr<Instance> getInstance() const
{
Expand All @@ -77,7 +76,12 @@ namespace DataStormI
// the specified node and sets this member once the connection is established.
std::optional<DataStormContract::LookupPrx> _connectedTo;

// Stores the connection currently sending an announcement. This is used to exclude the node session
// associated with the connection from the list of node sessions receiving the forwarded announcement.
mutable Ice::ConnectionPtr _exclude;

// A proxy to a collocated forwarder servant responsible for forwarding messages to the `Lookup`
// objects of all active node sessions.
DataStormContract::LookupPrx _forwarder;
};
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& newConnection,
lock_guard<mutex> lock(_mutex);
if (_destroyed || _session)
{
// Nothing to do, we are either destroyed or already connected.
return;
}

Expand Down Expand Up @@ -588,7 +589,7 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex)
lock_guard<mutex> lock(_mutex);
if (_destroyed || (connection && _connection != connection) || !_session)
{
// Ignore we either already destroyed, or disconnected, or a new connection has already been established.
// Ignore we are either already destroyed, or disconnected, or a new connection has already been established.
return false;
}

Expand Down
41 changes: 22 additions & 19 deletions cpp/src/DataStorm/TopicFactoryI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
using namespace std;
using namespace DataStormI;

TopicFactoryI::TopicFactoryI(const shared_ptr<Instance>& instance)
: _instance(instance),
_traceLevels(instance->getTraceLevels()),
_nextReaderId(0),
_nextWriterId(0)
TopicFactoryI::TopicFactoryI(shared_ptr<Instance> instance)
: _instance{std::move(instance)},
_nextReaderId{0},
_nextWriterId{0}
{
}

Expand All @@ -30,6 +29,7 @@ TopicFactoryI::createTopicReader(
shared_ptr<FilterManager> sampleFilterFactories)
{
shared_ptr<TopicReaderI> reader;
auto instance = getInstance();
bool hasWriters;
{
lock_guard<mutex> lock(_mutex);
Expand All @@ -43,9 +43,9 @@ TopicFactoryI::createTopicReader(
name, // we keep using name below
_nextReaderId++);
_readers[name].push_back(reader);
if (_traceLevels->topic > 0)
if (instance->getTraceLevels()->topic > 0)
{
Trace out(_traceLevels, _traceLevels->topicCat);
Trace out(instance->getTraceLevels(), instance->getTraceLevels()->topicCat);
out << name << ": created topic reader";
}

Expand All @@ -54,7 +54,6 @@ TopicFactoryI::createTopicReader(

try
{
auto instance = getInstance();
auto node = instance->getNode();
auto nodePrx = node->getProxy();
if (hasWriters)
Expand Down Expand Up @@ -83,6 +82,7 @@ TopicFactoryI::createTopicWriter(
shared_ptr<FilterManager> sampleFilterFactories)
{
shared_ptr<TopicWriterI> writer;
auto instance = getInstance();
bool hasReaders;
{
lock_guard<mutex> lock(_mutex);
Expand All @@ -96,9 +96,10 @@ TopicFactoryI::createTopicWriter(
name, // we keep using name below
_nextWriterId++);
_writers[name].push_back(writer);
if (_traceLevels->topic > 0)

if (instance->getTraceLevels()->topic > 0)
{
Trace out(_traceLevels, _traceLevels->topicCat);
Trace out(instance->getTraceLevels(), instance->getTraceLevels()->topicCat);
out << name << ": created topic writer";
}

Expand All @@ -107,7 +108,6 @@ TopicFactoryI::createTopicWriter(

try
{
auto instance = getInstance();
auto node = instance->getNode();
auto nodePrx = node->getProxy();
if (hasReaders)
Expand All @@ -131,9 +131,10 @@ void
TopicFactoryI::removeTopicReader(const string& name, const shared_ptr<TopicI>& reader)
{
lock_guard<mutex> lock(_mutex);
if (_traceLevels->topic > 0)
auto instance = getInstance();
if (instance->getTraceLevels()->topic > 0)
{
Trace out(_traceLevels, _traceLevels->topicCat);
Trace out(instance->getTraceLevels(), instance->getTraceLevels()->topicCat);
out << name << ": destroyed topic reader";
}
auto& readers = _readers[name];
Expand All @@ -148,9 +149,10 @@ void
TopicFactoryI::removeTopicWriter(const string& name, const shared_ptr<TopicI>& writer)
{
lock_guard<mutex> lock(_mutex);
if (_traceLevels->topic > 0)
auto instance = getInstance();
if (instance->getTraceLevels()->topic > 0)
{
Trace out(_traceLevels, _traceLevels->topicCat);
Trace out(instance->getTraceLevels(), instance->getTraceLevels()->topicCat);
out << name << ": destroyed topic writer";
}
auto& writers = _writers[name];
Expand Down Expand Up @@ -257,9 +259,9 @@ TopicFactoryI::getTopicReaderNames() const
lock_guard<mutex> lock(_mutex);
Ice::StringSeq readers;
readers.reserve(_readers.size());
for (const auto& p : _readers)
for (const auto& [name, _] : _readers)
{
readers.push_back(p.first);
readers.push_back(name);
}
return readers;
}
Expand All @@ -270,9 +272,9 @@ TopicFactoryI::getTopicWriterNames() const
lock_guard<mutex> lock(_mutex);
Ice::StringSeq writers;
writers.reserve(_writers.size());
for (const auto& p : _writers)
for (const auto& [name, _] : _writers)
{
writers.push_back(p.first);
writers.push_back(name);
}
return writers;
}
Expand All @@ -288,6 +290,7 @@ TopicFactoryI::shutdown() const
w->shutdown();
}
}

for (const auto& p : _readers)
{
for (const auto& r : p.second)
Expand Down
15 changes: 10 additions & 5 deletions cpp/src/DataStorm/TopicFactoryI.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace DataStormI
class TopicFactoryI final : public TopicFactory, public std::enable_shared_from_this<TopicFactoryI>
{
public:
TopicFactoryI(const std::shared_ptr<Instance>&);
TopicFactoryI(std::shared_ptr<Instance>);

std::shared_ptr<TopicReader> createTopicReader(
std::string,
Expand Down Expand Up @@ -63,13 +63,18 @@ namespace DataStormI
void shutdown() const;

private:
mutable std::mutex _mutex;
std::weak_ptr<Instance> _instance;
std::shared_ptr<TraceLevels> _traceLevels;
std::map<std::string, std::vector<std::shared_ptr<TopicI>>> _readers;
std::map<std::string, std::vector<std::shared_ptr<TopicI>>> _writers;
mutable std::mutex _mutex;
std::int64_t _nextReaderId;
std::int64_t _nextWriterId;

// A map of topic readers indexed by the topic name.
// Each key is a topic name, and the corresponding value is a vector of readers for that topic.
std::map<std::string, std::vector<std::shared_ptr<TopicI>>> _readers;

// A map of topic writers indexed by the topic name.
// Each key is a topic name, and the corresponding value is a vector of writers for that topic.
std::map<std::string, std::vector<std::shared_ptr<TopicI>>> _writers;
};
}
#endif

0 comments on commit 39188dd

Please sign in to comment.