diff --git a/README.md b/README.md index e8a38e7..c8487c9 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,9 @@ Then cycle through the states by typing 'init', 'conf' and 'start'. To stop the issue the 'record' command. Note that this will store a data dump from the luminosity buffer should its occupancy rise above 80%. The stored data are 'raw' and not subject to any trigger selection. +Note - due to the need for a more configurable fake trigger implementation in readout this test will produce numerous warnings for failed trigger +requests. These can be safely ignored. + ## ND-GAr: TBD Configuration steps: 1. TBD diff --git a/python/lbrulibs/fake_NDreadout.json b/python/lbrulibs/fake_NDreadout.json index 7d4dc9b..60a1fd0 100644 --- a/python/lbrulibs/fake_NDreadout.json +++ b/python/lbrulibs/fake_NDreadout.json @@ -135,7 +135,7 @@ "apa_number": 0, "emulator_mode": false, "fake_trigger_flag": 1, - "latency_buffer_size": 10, + "latency_buffer_size": 10000, "link_number": 0, "num_request_handling_threads": 1, "pop_limit_pct": 0.8, diff --git a/src/ZMQIssues.hpp b/src/ZMQIssues.hpp index 7e62e5f..9bfa5fc 100644 --- a/src/ZMQIssues.hpp +++ b/src/ZMQIssues.hpp @@ -18,6 +18,7 @@ ERS_DECLARE_ISSUE(lbrulibs, InitializationError, " ZMQ Initialization Error: " < ERS_DECLARE_ISSUE(lbrulibs, GenericNDMessage, " ND Readout: " << initmsg, ((std::string)initmsg)) +ERS_DECLARE_ISSUE(lbrulibs, ReceiveTimeoutExpired, "Unable to receive within timeout period: " << timeout << " milliseconds.",((int)timeout)) } // namespace dunedaq #endif // LBRULIBS_SRC_ZMQISSUES_HPP_ diff --git a/src/ZMQLinkConcept.hpp b/src/ZMQLinkConcept.hpp index 62de2f2..9320eeb 100644 --- a/src/ZMQLinkConcept.hpp +++ b/src/ZMQLinkConcept.hpp @@ -11,7 +11,8 @@ #define LBRULIBS_SRC_ZMQLINKCONCEPT_HPP_ #include -#include "ipm/Subscriber.hpp" +//#include "ipm/Subscriber.hpp" +#include "zmq.hpp" #include #include @@ -50,11 +51,15 @@ class ZMQLinkConcept { protected: dunedaq::lbrulibs::pacmancardreader::Conf m_cfg; - std::shared_ptr m_subscriber; + //std::shared_ptr m_subscriber; std::chrono::milliseconds m_queue_timeout; + std::chrono::milliseconds m_sink_timeout{10}; + bool m_subscriber_connected{false}; + zmq::context_t m_context; + zmq::socket_t m_subscriber{m_context, zmq::socket_type::sub}; int m_card_id; int m_link_tag; - std::string m_ZMQLink_commandLink = "tcp://127.0.0.1:5555"; + //std::string m_ZMQLink_commandLink = "tcp://127.0.0.1:5555"; std::string m_ZMQLink_sourceLink = "tcp://127.0.0.1:5556"; private: diff --git a/src/ZMQLinkModel.hpp b/src/ZMQLinkModel.hpp index 2267f7c..be81cb7 100644 --- a/src/ZMQLinkModel.hpp +++ b/src/ZMQLinkModel.hpp @@ -17,7 +17,8 @@ #include #include -#include "ipm/Subscriber.hpp" +//#include "ipm/Subscriber.hpp" +#include "zmq.hpp" #include "readout/NDReadoutTypes.hpp" #include @@ -26,7 +27,11 @@ #include namespace dunedaq::lbrulibs { - +/* +ERS_DECLARE_ISSUE(lbrulibs,ReceiveTimeoutExpired, + "Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)", + ((int)timeout)) // NOLINT +*/ template class ZMQLinkModel : public ZMQLinkConcept { public: @@ -59,7 +64,6 @@ class ZMQLinkModel : public ZMQLinkConcept { void init(const data_t& /*args*/) { TLOG_DEBUG(5) << "ZMQLinkModel init: nothing to do!"; - } void conf(const data_t& args) { @@ -70,13 +74,14 @@ class ZMQLinkModel : public ZMQLinkConcept { m_cfg = args.get(); m_queue_timeout = std::chrono::milliseconds(m_cfg.zmq_receiver_timeout); - TLOG_DEBUG(5) << "ZMQLinkModel conf: initialising subscriber!"; - m_subscriber = dunedaq::ipm::make_ipm_subscriber("ZmqSubscriber"); + m_subscriber_connected = false; + m_subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); TLOG_DEBUG(5) << "ZMQLinkModel conf: connecting subscriber!"; - m_subscriber->connect_for_receives({ {"connection_string", m_ZMQLink_sourceLink} }); + m_subscriber.connect(m_ZMQLink_sourceLink); + m_subscriber_connected = true; TLOG_DEBUG(5) << "ZMQLinkModel conf: enacting subscription!"; - m_subscriber->subscribe(""); + m_subscriber.setsockopt(ZMQ_SUBSCRIBE, ""); TLOG_DEBUG(5) << "Configuring ZMQLinkModel!"; m_parser_thread.set_name(m_ZMQLink_sourceLink, m_link_tag); @@ -150,36 +155,37 @@ class ZMQLinkModel : public ZMQLinkConcept { size_t counter = 0; std::ostringstream oss; + + zmq::pollitem_t items[] = {{static_cast(m_subscriber),0,ZMQ_POLLIN,0}}; while (m_run_marker.load()) { TLOG_DEBUG(1) << "Looping"; - if (m_subscriber->can_receive()) { + if (m_subscriber_connected) { TLOG_DEBUG(1) << ": Ready to receive data"; - try { - auto recvd = m_subscriber->receive(m_queue_timeout); - if (recvd.data.size() == 0) { + zmq::message_t msg; + zmq::poll (&items [0],1,m_queue_timeout); + if (items[0].revents & ZMQ_POLLIN){ + auto recvd = m_subscriber.recv(&msg); + if (recvd == 0) { TLOG_DEBUG(1) << "No data received, moving to next loop iteration"; continue; - } - TLOG_DEBUG(1) << ": Pushing data into output_queue"; - try { - TargetPayloadType* Payload = new TargetPayloadType(); - std::memcpy((void *)&Payload->data, &recvd.data[0], recvd.data.size()); - - m_sink_queue->push(*Payload, ZMQLinkConcept::m_queue_timeout); - } catch (const appfwk::QueueTimeoutExpired& ex) { - ers::warning(ex); + } + TLOG_DEBUG(1) << ": Pushing data into output_queue"; + try { + TargetPayloadType* Payload = new TargetPayloadType(); + std::memcpy((void *)&Payload->data, msg.data(), msg.size()); + + m_sink_queue->push(*Payload, m_sink_timeout); + } catch (const appfwk::QueueTimeoutExpired& ex) { + ers::warning(ex); + } + + TLOG_DEBUG(1) << ": End of do_work loop"; + counter++; } - } catch (dunedaq::ipm::ReceiveTimeoutExpired const& rte) { - TLOG_DEBUG(1) << "ReceiveTimeoutExpired: " << rte.what(); - continue; - } - TLOG_DEBUG(1) << ": End of do_work loop"; - counter++; } else { - TLOG_DEBUG(1) << "Sleeping"; - //std::this_thread::sleep_for(std::chrono::seconds(1)); + TLOG_DEBUG(1) << "Subscriber not yet connected"; } } } diff --git a/test/pacman-generator.py b/test/pacman-generator.py index a5809ea..d492c3d 100644 --- a/test/pacman-generator.py +++ b/test/pacman-generator.py @@ -106,7 +106,7 @@ def pacman(_echo_server,_cmd_server,_data_server,word_lists,nRepeats=1): for n in range(nRepeats): for i in word_lists: - data_socket.send(b"", zmq.SNDMORE); + #data_socket.send(b"", zmq.SNDMORE) data_socket.send(pacman_msg_format.format_msg('DATA',i)) print(pacman_msg_format.parse_msg(pacman_msg_format.format_msg('DATA',i))) message_count += 1