From e5be50569ccd7491e5ff93c132ffa6f0e9531f78 Mon Sep 17 00:00:00 2001 From: Krzysztof Furman Date: Thu, 15 Jul 2021 20:01:22 +0100 Subject: [PATCH] Implemented a poller instead of a receive timeout --- src/ZMQLinkModel.hpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/ZMQLinkModel.hpp b/src/ZMQLinkModel.hpp index aec6f35..8c48bd4 100644 --- a/src/ZMQLinkModel.hpp +++ b/src/ZMQLinkModel.hpp @@ -76,7 +76,7 @@ class ZMQLinkModel : public ZMQLinkConcept { m_queue_timeout = std::chrono::milliseconds(m_cfg.zmq_receiver_timeout); TLOG_DEBUG(5) << "ZMQLinkModel conf: initialising subscriber!"; m_subscriber_connected = false; - m_subscriber.setsockopt(ZMQ_RCVTIMEO, (int)m_queue_timeout.count()); + //m_subscriber.setsockopt(ZMQ_RCVTIMEO, (int)m_queue_timeout.count()); m_subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); TLOG_DEBUG(5) << "ZMQLinkModel conf: connecting subscriber!"; m_subscriber.connect(m_ZMQLink_sourceLink); @@ -156,12 +156,16 @@ class ZMQLinkModel : public ZMQLinkConcept { size_t counter = 0; std::ostringstream oss; + + zmq::pollitem_t items[] = {{static_cast(m_subscriber),0,ZMQ_POLLIN,0}}; while (m_run_marker.load()) { TLOG_DEBUG(1) << "Looping"; if (m_subscriber_connected) { TLOG_DEBUG(1) << ": Ready to receive data"; zmq::message_t msg; + zmq::poll (&items [0],1,m_queue_timeout); + if (items[0].revents & ZMQ_POLLIN){ auto recvd = m_subscriber.recv(&msg); if (recvd == 0) { TLOG_DEBUG(1) << "No data received, moving to next loop iteration"; @@ -179,6 +183,8 @@ class ZMQLinkModel : public ZMQLinkConcept { TLOG_DEBUG(1) << ": End of do_work loop"; counter++; + } + } else { TLOG_DEBUG(1) << "Subscriber not yet connected"; }