Skip to content

Commit

Permalink
Merge pull request #13 from DUNE-DAQ/kf_subscriberchange
Browse files Browse the repository at this point in the history
Add new poller and update configuration.
  • Loading branch information
willpvazquez authored Jul 16, 2021
2 parents 34ee565 + c7ba49a commit cfe432f
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 33 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Then cycle through the states by typing 'init', 'conf' and 'start'. To stop the
issue the 'record' command. Note that this will store a data dump from the luminosity buffer should its occupancy rise above 80%. The stored
data are 'raw' and not subject to any trigger selection.

Note - due to the need for a more configurable fake trigger implementation in readout this test will produce numerous warnings for failed trigger
requests. These can be safely ignored.

## ND-GAr: TBD
Configuration steps:
1. TBD
Expand Down
2 changes: 1 addition & 1 deletion python/lbrulibs/fake_NDreadout.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
"apa_number": 0,
"emulator_mode": false,
"fake_trigger_flag": 1,
"latency_buffer_size": 10,
"latency_buffer_size": 10000,
"link_number": 0,
"num_request_handling_threads": 1,
"pop_limit_pct": 0.8,
Expand Down
1 change: 1 addition & 0 deletions src/ZMQIssues.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ ERS_DECLARE_ISSUE(lbrulibs, InitializationError, " ZMQ Initialization Error: " <

ERS_DECLARE_ISSUE(lbrulibs, GenericNDMessage, " ND Readout: " << initmsg, ((std::string)initmsg))

ERS_DECLARE_ISSUE(lbrulibs, ReceiveTimeoutExpired, "Unable to receive within timeout period: " << timeout << " milliseconds.",((int)timeout))
} // namespace dunedaq

#endif // LBRULIBS_SRC_ZMQISSUES_HPP_
11 changes: 8 additions & 3 deletions src/ZMQLinkConcept.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
#define LBRULIBS_SRC_ZMQLINKCONCEPT_HPP_

#include <nlohmann/json.hpp>
#include "ipm/Subscriber.hpp"
//#include "ipm/Subscriber.hpp"
#include "zmq.hpp"

#include <memory>
#include <sstream>
Expand Down Expand Up @@ -50,11 +51,15 @@ class ZMQLinkConcept {

protected:
dunedaq::lbrulibs::pacmancardreader::Conf m_cfg;
std::shared_ptr<ipm::Subscriber> m_subscriber;
//std::shared_ptr<ipm::Subscriber> m_subscriber;
std::chrono::milliseconds m_queue_timeout;
std::chrono::milliseconds m_sink_timeout{10};
bool m_subscriber_connected{false};
zmq::context_t m_context;
zmq::socket_t m_subscriber{m_context, zmq::socket_type::sub};
int m_card_id;
int m_link_tag;
std::string m_ZMQLink_commandLink = "tcp://127.0.0.1:5555";
//std::string m_ZMQLink_commandLink = "tcp://127.0.0.1:5555";
std::string m_ZMQLink_sourceLink = "tcp://127.0.0.1:5556";
private:

Expand Down
62 changes: 34 additions & 28 deletions src/ZMQLinkModel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
#include <nlohmann/json.hpp>
#include <folly/ProducerConsumerQueue.h>

#include "ipm/Subscriber.hpp"
//#include "ipm/Subscriber.hpp"
#include "zmq.hpp"
#include "readout/NDReadoutTypes.hpp"

#include <string>
Expand All @@ -26,7 +27,11 @@
#include <memory>

namespace dunedaq::lbrulibs {

/*
ERS_DECLARE_ISSUE(lbrulibs,ReceiveTimeoutExpired,
"Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)",
((int)timeout)) // NOLINT
*/
template<class TargetPayloadType>
class ZMQLinkModel : public ZMQLinkConcept {
public:
Expand Down Expand Up @@ -59,7 +64,6 @@ class ZMQLinkModel : public ZMQLinkConcept {

void init(const data_t& /*args*/) {
TLOG_DEBUG(5) << "ZMQLinkModel init: nothing to do!";

}

void conf(const data_t& args) {
Expand All @@ -70,13 +74,14 @@ class ZMQLinkModel : public ZMQLinkConcept {
m_cfg = args.get<pacmancardreader::Conf>();

m_queue_timeout = std::chrono::milliseconds(m_cfg.zmq_receiver_timeout);

TLOG_DEBUG(5) << "ZMQLinkModel conf: initialising subscriber!";
m_subscriber = dunedaq::ipm::make_ipm_subscriber("ZmqSubscriber");
m_subscriber_connected = false;
m_subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
TLOG_DEBUG(5) << "ZMQLinkModel conf: connecting subscriber!";
m_subscriber->connect_for_receives({ {"connection_string", m_ZMQLink_sourceLink} });
m_subscriber.connect(m_ZMQLink_sourceLink);
m_subscriber_connected = true;
TLOG_DEBUG(5) << "ZMQLinkModel conf: enacting subscription!";
m_subscriber->subscribe("");
m_subscriber.setsockopt(ZMQ_SUBSCRIBE, "");
TLOG_DEBUG(5) << "Configuring ZMQLinkModel!";

m_parser_thread.set_name(m_ZMQLink_sourceLink, m_link_tag);
Expand Down Expand Up @@ -150,36 +155,37 @@ class ZMQLinkModel : public ZMQLinkConcept {

size_t counter = 0;
std::ostringstream oss;

zmq::pollitem_t items[] = {{static_cast<void*>(m_subscriber),0,ZMQ_POLLIN,0}};
while (m_run_marker.load()) {
TLOG_DEBUG(1) << "Looping";

if (m_subscriber->can_receive()) {
if (m_subscriber_connected) {
TLOG_DEBUG(1) << ": Ready to receive data";
try {
auto recvd = m_subscriber->receive(m_queue_timeout);
if (recvd.data.size() == 0) {
zmq::message_t msg;
zmq::poll (&items [0],1,m_queue_timeout);
if (items[0].revents & ZMQ_POLLIN){
auto recvd = m_subscriber.recv(&msg);
if (recvd == 0) {
TLOG_DEBUG(1) << "No data received, moving to next loop iteration";
continue;
}
TLOG_DEBUG(1) << ": Pushing data into output_queue";
try {
TargetPayloadType* Payload = new TargetPayloadType();
std::memcpy((void *)&Payload->data, &recvd.data[0], recvd.data.size());

m_sink_queue->push(*Payload, ZMQLinkConcept::m_queue_timeout);
} catch (const appfwk::QueueTimeoutExpired& ex) {
ers::warning(ex);
}
TLOG_DEBUG(1) << ": Pushing data into output_queue";
try {
TargetPayloadType* Payload = new TargetPayloadType();
std::memcpy((void *)&Payload->data, msg.data(), msg.size());

m_sink_queue->push(*Payload, m_sink_timeout);
} catch (const appfwk::QueueTimeoutExpired& ex) {
ers::warning(ex);
}

TLOG_DEBUG(1) << ": End of do_work loop";
counter++;
}

} catch (dunedaq::ipm::ReceiveTimeoutExpired const& rte) {
TLOG_DEBUG(1) << "ReceiveTimeoutExpired: " << rte.what();
continue;
}
TLOG_DEBUG(1) << ": End of do_work loop";
counter++;
} else {
TLOG_DEBUG(1) << "Sleeping";
//std::this_thread::sleep_for(std::chrono::seconds(1));
TLOG_DEBUG(1) << "Subscriber not yet connected";
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/pacman-generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def pacman(_echo_server,_cmd_server,_data_server,word_lists,nRepeats=1):

for n in range(nRepeats):
for i in word_lists:
data_socket.send(b"", zmq.SNDMORE);
#data_socket.send(b"", zmq.SNDMORE)
data_socket.send(pacman_msg_format.format_msg('DATA',i))
print(pacman_msg_format.parse_msg(pacman_msg_format.format_msg('DATA',i)))
message_count += 1
Expand Down

0 comments on commit cfe432f

Please sign in to comment.