Skip to content

Commit

Permalink
Add overloaded request method to node for non-blocking calls using ab…
Browse files Browse the repository at this point in the history
…stract types.

- Add overloaded method to Node.
- Update specialisation of ReqHandler to handle callbacks using google::protobuf::Message.

Signed-off-by: Rhys Mainwaring <[email protected]>
  • Loading branch information
srmainwaring committed Feb 24, 2023
1 parent ec25fe9 commit 0fd4b7c
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 2 deletions.
21 changes: 21 additions & 0 deletions include/gz/transport/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,27 @@ namespace gz
std::function<void(const ReplyT &_reply,
const bool _result)> &_callback);

/// \brief Request a new service using a non-blocking call.
/// In this version the callback is a lambda function and the
/// request and response may be abstract types.
/// \param[in] _topic Service name requested.
/// \param[in] _request Protobuf message containing the request's
/// parameters.
/// \param[in] _callback Lambda function executed when the response
/// arrives. The callback has the following parameters:
/// * _reply Protobuf message containing the response.
/// * _result Result of the service call. If false, there was
/// a problem executing your request.
/// \param[in] _repType Message type used in the response.
/// \return true when the service call was succesfully requested.
public: template<typename RequestT, typename ReplyT>
bool Request(
const std::string &_topic,
const RequestT &_request,
std::function<void(const ReplyT &_reply,
const bool _result)> &_callback,
const char *_repType);

/// \brief Request a new service without input parameter using a
/// non-blocking call.
/// In this version the callback is a lambda function.
Expand Down
68 changes: 66 additions & 2 deletions include/gz/transport/ReqHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <memory>
#include <string>

#include <gz/msgs/Factory.hh>

#include "gz/transport/config.hh"
#include "gz/transport/Export.hh"
#include "gz/transport/TransportTypes.hh"
Expand Down Expand Up @@ -318,6 +320,43 @@ namespace gz
{
}

/// \brief Create a specific protobuf message given its serialized data.
/// \param[in] _data The serialized data.
/// \return Pointer to the specific protobuf message.
public: std::shared_ptr<google::protobuf::Message>
CreateMsg(const std::string &_data) const
{
// Instantiate a specific protobuf message
std::shared_ptr<google::protobuf::Message> msgPtr =
gz::msgs::Factory::New(this->RepTypeName());
if (!msgPtr)
{
std::cerr << "Unable to create response of type["
<< this->RepTypeName() << "].\n";
return nullptr;
}

// Create the message using some serialized data
if (!msgPtr->ParseFromString(_data))
{
std::cerr << "ReqHandler::CreateMsg() error: ParseFromString failed"
<< std::endl;
}

return msgPtr;
}

/// \brief Set the callback for this handler.
/// \param[in] _cb The callback with the following parameters:
/// * _rep Protobuf message containing the service response.
/// * _result True when the service request was successful or
/// false otherwise.
public: void SetCallback(const std::function <void(
const google::protobuf::Message &_rep, const bool _result)> &_cb)
{
this->cb = _cb;
}

/// \brief Set the REQ protobuf message for this handler.
/// \param[in] _reqMsg Protofub message containing the input parameters of
/// of the service request.
Expand Down Expand Up @@ -371,8 +410,25 @@ namespace gz
// Documentation inherited.
public: void NotifyResult(const std::string &_rep, const bool _result)
{
this->rep = _rep;
this->result = _result;
// Execute the callback (if existing).
if (this->cb)
{
// Instantiate the specific protobuf message associated to this topic.
auto msg = this->CreateMsg(_rep);
if (!msg)
{
/// \todo(srmainwaring) verify this is the correct fail behaviour
this->result = false;
this->repAvailable = false;
this->condition.notify_one();
}
this->cb(*msg, _result);
}
else
{
this->rep = _rep;
this->result = _result;
}

this->repAvailable = true;
this->condition.notify_one();
Expand Down Expand Up @@ -409,6 +465,14 @@ namespace gz

/// \brief Protobuf message containing the response.
private: google::protobuf::Message *repMsg = nullptr;

/// \brief Callback to the function registered for this handler with the
/// following parameters:
/// \param[in] _rep Protobuf message containing the service response.
/// \param[in] _result True when the service request was successful or
/// false otherwise.
private: std::function<void(
const google::protobuf::Message &_rep, const bool _result)> cb;
};
}
}
Expand Down
93 changes: 93 additions & 0 deletions include/gz/transport/detail/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,99 @@ namespace gz
return true;
}

//////////////////////////////////////////////////
template<typename RequestT, typename ReplyT>
bool Node::Request(
const std::string &_topic,
const RequestT &_request,
std::function<void(const ReplyT &_reply, const bool _result)> &_cb,
const char *_repType)
{
auto rep = gz::msgs::Factory::New(_repType);
if (!rep)
{
std::cerr << "Unable to create response of type["
<< _repType << "].\n";
return false;
}

// Topic remapping.
std::string topic = _topic;
this->Options().TopicRemap(_topic, topic);

std::string fullyQualifiedTopic;
if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
this->Options().NameSpace(), topic, fullyQualifiedTopic))
{
std::cerr << "Service [" << topic << "] is not valid." << std::endl;
return false;
}

bool localResponserFound;
IRepHandlerPtr repHandler;
{
std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
localResponserFound = this->Shared()->repliers.FirstHandler(
fullyQualifiedTopic,
_request.GetTypeName(),
rep->GetTypeName(),
repHandler);
}

// If the responser is within my process.
if (localResponserFound)
{
// There is a responser in my process, let's use it.
bool result = repHandler->RunLocalCallback(_request, *rep);

_cb(*rep, result);
return true;
}

// Create a new request handler.
std::shared_ptr<ReqHandler<RequestT, ReplyT>> reqHandlerPtr(
new ReqHandler<RequestT, ReplyT>(this->NodeUuid()));

// Insert the request's parameters.
reqHandlerPtr->SetMessage(&_request);

// Set the response message (to set the type info).
reqHandlerPtr->SetResponse(rep.get());

// Insert the callback into the handler.
reqHandlerPtr->SetCallback(_cb);

{
std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);

// Store the request handler.
this->Shared()->requests.AddHandler(
fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);

// If the responser's address is known, make the request.
SrvAddresses_M addresses;
if (this->Shared()->TopicPublishers(fullyQualifiedTopic, addresses))
{
this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
_request.GetTypeName(), rep->GetTypeName());
}
else
{
// Discover the service responser.
if (!this->Shared()->DiscoverService(fullyQualifiedTopic))
{
std::cerr << "Node::Request(): Error discovering service ["
<< topic
<< "]. Did you forget to start the discovery service?"
<< std::endl;
return false;
}
}
}

return true;
}

//////////////////////////////////////////////////
template<typename ReplyT>
bool Node::Request(
Expand Down

0 comments on commit 0fd4b7c

Please sign in to comment.