Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt FairMQMessage backed memory resource collection from AliceO2 #93

Merged
merged 8 commits into from
Oct 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ find_package(Threads REQUIRED)

if(BUILD_FAIRMQ)
find_package2(PUBLIC Boost VERSION 1.64 REQUIRED
COMPONENTS program_options thread system filesystem regex date_time signals
COMPONENTS container program_options thread system filesystem regex date_time signals
)
find_package2(PUBLIC FairLogger VERSION 1.2.0 REQUIRED)
find_package2(PRIVATE ZeroMQ VERSION 4.1.5 REQUIRED)
Expand Down
4 changes: 4 additions & 0 deletions fairmq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ set(FAIRMQ_PUBLIC_HEADER_FILES
FairMQSocket.h
FairMQStateMachine.h
FairMQTransportFactory.h
MemoryResources.h
MemoryResourceTools.h
Tools.h
Transports.h
options/FairMQProgOptions.h
Expand Down Expand Up @@ -155,6 +157,7 @@ set(FAIRMQ_SOURCE_FILES
zeromq/FairMQUnmanagedRegionZMQ.cxx
zeromq/FairMQSocketZMQ.cxx
zeromq/FairMQTransportFactoryZMQ.cxx
MemoryResources.cxx
)

if(BUILD_NANOMSG_TRANSPORT)
Expand Down Expand Up @@ -232,6 +235,7 @@ endif()

target_link_libraries(${_target}
INTERFACE # only consumers link against interface dependencies
Boost::container

PUBLIC # libFairMQ AND consumers of libFairMQ link aginst public dependencies
Threads::Threads
Expand Down
24 changes: 12 additions & 12 deletions fairmq/FairMQChannel.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -689,49 +689,49 @@ void FairMQChannel::ResetChannel()
// TODO: implement channel resetting
}

int FairMQChannel::Send(unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs) const
int FairMQChannel::Send(unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, sndTimeoutInMs);
}

int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs) const
int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, rcvTimeoutInMs);
}

int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg) const
int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, 0);
}

int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg) const
int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, 0);
}

int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec, int sndTimeoutInMs) const
int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec, int sndTimeoutInMs)
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, sndTimeoutInMs);
}

int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, int rcvTimeoutInMs) const
int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, int rcvTimeoutInMs)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, rcvTimeoutInMs);
}

int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec)
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, 0);
}

int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, 0);
Expand Down Expand Up @@ -761,7 +761,7 @@ unsigned long FairMQChannel::GetMessagesRx() const
return fSocket->GetMessagesRx();
}

void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg) const
void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType())
{
Expand All @@ -776,7 +776,7 @@ void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg) const
}
}

void FairMQChannel::CheckSendCompatibility(vector<FairMQMessagePtr>& msgVec) const
void FairMQChannel::CheckSendCompatibility(vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec)
{
Expand All @@ -794,7 +794,7 @@ void FairMQChannel::CheckSendCompatibility(vector<FairMQMessagePtr>& msgVec) con
}
}

void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg) const
void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType())
{
Expand All @@ -804,7 +804,7 @@ void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg) const
}
}

void FairMQChannel::CheckReceiveCompatibility(vector<FairMQMessagePtr>& msgVec) const
void FairMQChannel::CheckReceiveCompatibility(vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec)
{
Expand Down
40 changes: 20 additions & 20 deletions fairmq/FairMQChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,37 +178,37 @@ class FairMQChannel
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1) const;
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1);

/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1) const;
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1);

int SendAsync(FairMQMessagePtr& msg) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);")));
int ReceiveAsync(FairMQMessagePtr& msg) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);")));
int SendAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);")));
int ReceiveAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);")));

/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1) const;
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1);

/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1) const;
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1);

int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);")));
int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);")));
int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);")));
int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);")));

/// Send FairMQParts
/// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1) const
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
}
Expand All @@ -217,17 +217,17 @@ class FairMQChannel
/// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1) const
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);
}

int64_t SendAsync(FairMQParts& parts) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, timeout);")))
int64_t SendAsync(FairMQParts& parts) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, timeout);")))
{
return Send(parts.fParts, 0);
}

int64_t ReceiveAsync(FairMQParts& parts) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, timeout);")))
int64_t ReceiveAsync(FairMQParts& parts) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, timeout);")))
{
return Receive(parts.fParts, 0);
}
Expand All @@ -237,25 +237,25 @@ class FairMQChannel
unsigned long GetMessagesTx() const;
unsigned long GetMessagesRx() const;

auto Transport() const -> const FairMQTransportFactory*
auto Transport() -> FairMQTransportFactory*
{
return fTransportFactory.get();
};

template<typename... Args>
FairMQMessagePtr NewMessage(Args&&... args) const
FairMQMessagePtr NewMessage(Args&&... args)
{
return Transport()->CreateMessage(std::forward<Args>(args)...);
}

template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data) const
FairMQMessagePtr NewSimpleMessage(const T& data)
{
return Transport()->NewSimpleMessage(data);
}

template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data) const
FairMQMessagePtr NewStaticMessage(const T& data)
{
return Transport()->NewStaticMessage(data);
}
Expand All @@ -279,10 +279,10 @@ class FairMQChannel

std::shared_ptr<FairMQTransportFactory> fTransportFactory;

void CheckSendCompatibility(FairMQMessagePtr& msg) const;
void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec) const;
void CheckReceiveCompatibility(FairMQMessagePtr& msg) const;
void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec) const;
void CheckSendCompatibility(FairMQMessagePtr& msg);
void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec);
void CheckReceiveCompatibility(FairMQMessagePtr& msg);
void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec);

void InitTransport(std::shared_ptr<FairMQTransportFactory> factory);

Expand Down
4 changes: 2 additions & 2 deletions fairmq/FairMQDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const
}
}

bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i) const
bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i)
{
unique_ptr<FairMQMessage> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());

Expand All @@ -739,7 +739,7 @@ bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback&
}
}

bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipartCallback& callback, int i) const
bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipartCallback& callback, int i)
{
FairMQParts input;

Expand Down
Loading