Skip to content

Commit

Permalink
add new timeout for data sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Guillermo Panduro Vazquez committed Jul 16, 2021
1 parent cfbb96e commit c7ba49a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/ZMQLinkConcept.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ZMQLinkConcept {
dunedaq::lbrulibs::pacmancardreader::Conf m_cfg;
//std::shared_ptr<ipm::Subscriber> m_subscriber;
std::chrono::milliseconds m_queue_timeout;
std::chrono::milliseconds m_sink_timeout{10};
bool m_subscriber_connected{false};
zmq::context_t m_context;
zmq::socket_t m_subscriber{m_context, zmq::socket_type::sub};
Expand Down
33 changes: 16 additions & 17 deletions src/ZMQLinkModel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ class ZMQLinkModel : public ZMQLinkConcept {
m_queue_timeout = std::chrono::milliseconds(m_cfg.zmq_receiver_timeout);
TLOG_DEBUG(5) << "ZMQLinkModel conf: initialising subscriber!";
m_subscriber_connected = false;
//m_subscriber.setsockopt(ZMQ_RCVTIMEO, (int)m_queue_timeout.count());
m_subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
TLOG_DEBUG(5) << "ZMQLinkModel conf: connecting subscriber!";
m_subscriber.connect(m_ZMQLink_sourceLink);
Expand Down Expand Up @@ -165,25 +164,25 @@ class ZMQLinkModel : public ZMQLinkConcept {
TLOG_DEBUG(1) << ": Ready to receive data";
zmq::message_t msg;
zmq::poll (&items [0],1,m_queue_timeout);
if (items[0].revents & ZMQ_POLLIN){
auto recvd = m_subscriber.recv(&msg);
if (recvd == 0) {
if (items[0].revents & ZMQ_POLLIN){
auto recvd = m_subscriber.recv(&msg);
if (recvd == 0) {
TLOG_DEBUG(1) << "No data received, moving to next loop iteration";
continue;
}
TLOG_DEBUG(1) << ": Pushing data into output_queue";
try {
TargetPayloadType* Payload = new TargetPayloadType();
std::memcpy((void *)&Payload->data, msg.data(), msg.size());

m_sink_queue->push(*Payload, m_sink_timeout);
} catch (const appfwk::QueueTimeoutExpired& ex) {
ers::warning(ex);
}

TLOG_DEBUG(1) << ": End of do_work loop";
counter++;
}
TLOG_DEBUG(1) << ": Pushing data into output_queue";
try {
TargetPayloadType* Payload = new TargetPayloadType();
std::memcpy((void *)&Payload->data, msg.data(), msg.size());

m_sink_queue->push(*Payload, m_queue_timeout);
} catch (const appfwk::QueueTimeoutExpired& ex) {
ers::warning(ex);
}

TLOG_DEBUG(1) << ": End of do_work loop";
counter++;
}

} else {
TLOG_DEBUG(1) << "Subscriber not yet connected";
Expand Down

0 comments on commit c7ba49a

Please sign in to comment.