diff --git a/include/trigger/Latency.hpp b/include/trigger/Latency.hpp new file mode 100644 index 00000000..9337a821 --- /dev/null +++ b/include/trigger/Latency.hpp @@ -0,0 +1,100 @@ +/** + * @file Latency.hpp + * + * This is part of the DUNE DAQ Application Framework, copyright 2021. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#ifndef TRIGGER_INCLUDE_TRIGGER_LATENCY_HPP_ +#define TRIGGER_INCLUDE_TRIGGER_LATENCY_HPP_ + +#include "utilities/TimestampEstimator.hpp" +#include "utilities/TimestampEstimatorSystem.hpp" + +#include +#include +#include // Include for std::ostream + +namespace dunedaq { + namespace trigger { + + class Latency { + using latency = uint64_t; + + public: + // Enumeration for selecting time units + enum class TimeUnit { Microseconds = 1, Milliseconds = 2 }; + + // Constructor with optional time unit selection (defaults to Microseconds) + Latency(TimeUnit time_unit = TimeUnit::Microseconds) + : m_latency_in(0), m_latency_out(0), m_time_unit(time_unit) { + setup_conversion(); + } + + ~Latency() {} + + // Function to update latency_in + void update_latency_in(uint64_t latency) { + update_single_latency(latency, m_latency_in); + } + + // Function to update latency_out + void update_latency_out(uint64_t latency) { + update_single_latency(latency, m_latency_out); + } + + // Function to get the value of latency_in + latency get_latency_in() const { + return m_latency_in.load(); + } + + // Function to get the value of latency_out + latency get_latency_out() const { + return m_latency_out.load(); + } + + private: + // Set up conversion based on the selected time unit + void setup_conversion() { + if (m_time_unit == TimeUnit::Microseconds) { + m_clock_ticks_conversion = 16 * 1e-3; // Conversion for microseconds + m_get_current_time = []() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + }; + } else { + m_clock_ticks_conversion = 16 * 1e-6; // Conversion for milliseconds + m_get_current_time = []() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + }; + } + } + + // Function to get the current system time based on the set time unit + uint64_t get_current_system_time() const { + return m_get_current_time(); + } + + // Single update function for both latencies + void update_single_latency(uint64_t latency, std::atomic& latency_atomic) { + uint64_t current_time = get_current_system_time(); + uint64_t latency_time = latency * m_clock_ticks_conversion; + uint64_t diff = (current_time >= latency_time) ? (current_time - latency_time) : 0; + latency_atomic.store(diff); + } + + std::atomic m_latency_in; // Member variable to store latency_in + std::atomic m_latency_out; // Member variable to store latency_out + TimeUnit m_time_unit; // Member variable to store the selected time unit (ms or ns) + double m_clock_ticks_conversion; // Conversion factor from ticks to the selected time unit + + // Lambda to get the current time + std::function m_get_current_time; + }; + + } // namespace trigger +} // namespace dunedaq + +#endif // TRIGGER_INCLUDE_TRIGGER_LATENCY_HPP_ diff --git a/plugins/CustomTCMaker.cpp b/plugins/CustomTCMaker.cpp index 464da46a..81ff44f0 100644 --- a/plugins/CustomTCMaker.cpp +++ b/plugins/CustomTCMaker.cpp @@ -85,6 +85,7 @@ CustomTCMaker::init(std::shared_ptr mcfg) // Currently precalculates events for the next 60 seconds m_sorting_size_limit = 60 * m_conf->get_clock_frequency_hz(); + m_latency_monitoring.store( m_conf->get_latency_monitoring() ); } //void @@ -106,6 +107,14 @@ CustomTCMaker::generate_opmon_data() info.set_tc_failed_sent_count( m_tc_failed_sent_count.load() ); this->publish(std::move(info)); + + if ( m_latency_monitoring.load() && m_running_flag.load() ) { + opmon::TriggerLatencyStandalone lat_info; + + lat_info.set_latency_out( m_latency_instance.get_latency_out() ); + + this->publish(std::move(lat_info)); + } } void @@ -131,6 +140,11 @@ CustomTCMaker::do_start(const nlohmann::json& obj) { m_running_flag.store(true); + // OpMon. + m_tc_made_count.store(0); + m_tc_sent_count.store(0); + m_tc_failed_sent_count.store(0); + auto start_params = obj.get(); std::string timestamp_method = m_conf->get_timestamp_method(); @@ -241,6 +255,7 @@ CustomTCMaker::send_trigger_candidates() TLOG_DEBUG(1) << get_name() << " at timestamp " << m_timestamp_estimator->get_timestamp_estimate() << ", pushing a candidate with timestamp " << candidate.time_candidate; + if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( candidate.time_candidate ); try { m_trigger_candidate_sink->send(std::move(candidate), std::chrono::milliseconds(10)); m_tc_sent_count++; diff --git a/plugins/CustomTCMaker.hpp b/plugins/CustomTCMaker.hpp index f768d960..e3a26dcf 100644 --- a/plugins/CustomTCMaker.hpp +++ b/plugins/CustomTCMaker.hpp @@ -25,7 +25,9 @@ #include "iomanager/Sender.hpp" #include "utilities/TimestampEstimator.hpp" #include "triggeralgs/TriggerCandidate.hpp" +#include "trigger/Latency.hpp" #include "trigger/opmon/customtcmaker_info.pb.h" +#include "trigger/opmon/latency_info.pb.h" #include #include @@ -115,6 +117,11 @@ class CustomTCMaker : public dunedaq::appfwk::DAQModule std::atomic m_tc_sent_count{ 0 }; std::atomic m_tc_failed_sent_count{ 0 }; void print_opmon_stats(); + + // Create an instance of the Latency class + std::atomic m_latency_monitoring{ false }; + dunedaq::trigger::Latency m_latency_instance; + std::atomic m_latency_out{ 0 }; }; } // namespace trigger } // namespace dunedaq diff --git a/plugins/MLTModule.cpp b/plugins/MLTModule.cpp index 5d775254..db03cc7e 100644 --- a/plugins/MLTModule.cpp +++ b/plugins/MLTModule.cpp @@ -66,6 +66,9 @@ MLTModule::init(std::shared_ptr mcfg) m_decision_output = get_iom_sender(con->UID()); } + // Latency related + m_latency_monitoring.store( mtrg->get_configuration()->get_latency_monitoring() ); + // Now do the configuration: dummy for now m_configured_flag.store(true); } @@ -106,6 +109,21 @@ MLTModule::generate_opmon_data() td_info.set_inhibited(counts.inhibited.exchange(0)); this->publish( std::move(td_info), {{"type", name}} ); } + + // latency + if ( m_latency_monitoring.load() && m_running_flag.load() ) { + // TC in, TD out + opmon::TriggerLatency lat_info; + lat_info.set_latency_in( m_latency_instance.get_latency_in() ); + lat_info.set_latency_out( m_latency_instance.get_latency_out() ); + this->publish(std::move(lat_info)); + + // vs readout window requests + opmon::ModuleLevelTriggerRequestLatency lat_request_info; + lat_request_info.set_latency_window_start( m_latency_requests_instance.get_latency_in() ); + lat_request_info.set_latency_window_end( m_latency_requests_instance.get_latency_out() ); + this->publish(std::move(lat_request_info)); + } } void @@ -202,6 +220,7 @@ void MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision ) { m_td_msg_received_count++; + if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( decision.trigger_timestamp ); auto trigger_types = unpack_types(decision.trigger_type); for ( const auto t : trigger_types ) { @@ -221,6 +240,12 @@ MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision ) << decision.trigger_timestamp << " start " << decision.components.front().window_begin << " end " << decision.components.front().window_end << " number of links " << decision.components.size(); + // readout window latency update + if (m_latency_monitoring.load()) { + m_latency_requests_instance.update_latency_in( decision.components.front().window_begin ); + m_latency_requests_instance.update_latency_out( decision.components.front().window_end ); + } + try { m_decision_output->send(std::move(decision), std::chrono::milliseconds(1)); m_td_sent_count++; @@ -260,6 +285,7 @@ MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision ) } } + if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( decision.trigger_timestamp ); m_td_total_count++; } diff --git a/plugins/MLTModule.hpp b/plugins/MLTModule.hpp index 5a2dff10..e405a76a 100644 --- a/plugins/MLTModule.hpp +++ b/plugins/MLTModule.hpp @@ -17,7 +17,9 @@ #include "trigger/Issues.hpp" #include "trigger/LivetimeCounter.hpp" #include "trigger/TokenManager.hpp" +#include "trigger/Latency.hpp" #include "trigger/opmon/moduleleveltrigger_info.pb.h" +#include "trigger/opmon/latency_info.pb.h" #include "appfwk/DAQModule.hpp" @@ -251,6 +253,15 @@ class MLTModule : public dunedaq::appfwk::DAQModule return m_trigger_counters[type]; } + // Create an instance of the Latency class + std::atomic m_latency_monitoring{ false }; + dunedaq::trigger::Latency m_latency_instance; + dunedaq::trigger::Latency m_latency_requests_instance; + std::atomic m_latency_in{ 0 }; + std::atomic m_latency_out{ 0 }; + std::atomic m_latency_window_start{ 0 }; + std::atomic m_latency_window_end{ 0 }; + void print_opmon_stats(); }; } // namespace trigger diff --git a/plugins/RandomTCMakerModule.cpp b/plugins/RandomTCMakerModule.cpp index 72a42672..ea41c70c 100644 --- a/plugins/RandomTCMakerModule.cpp +++ b/plugins/RandomTCMakerModule.cpp @@ -62,6 +62,7 @@ RandomTCMakerModule::init(std::shared_ptr mcfg) m_time_sync_source = get_iom_receiver(con->UID()); } m_conf = mtrg->get_configuration(); + m_latency_monitoring.store( m_conf->get_latency_monitoring() ); } void @@ -74,6 +75,14 @@ RandomTCMakerModule::generate_opmon_data() info.set_tc_failed_sent_count( m_tc_failed_sent_count.load() ); this->publish(std::move(info)); + + if ( m_latency_monitoring.load() && m_running_flag.load() ) { + opmon::TriggerLatencyStandalone lat_info; + + lat_info.set_latency_out( m_latency_instance.get_latency_out() ); + + this->publish(std::move(lat_info)); + } } void @@ -89,6 +98,11 @@ RandomTCMakerModule::do_start(const nlohmann::json& obj) m_running_flag.store(true); + // OpMon. + m_tc_made_count.store(0); + m_tc_sent_count.store(0); + m_tc_failed_sent_count.store(0); + std::string timestamp_method = m_conf->get_timestamp_method(); if (timestamp_method == "kTimeSync") { TLOG_DEBUG(0) << "Creating TimestampEstimator"; @@ -208,11 +222,13 @@ RandomTCMakerModule::send_trigger_candidates() } next_trigger_timestamp = m_timestamp_estimator->get_timestamp_estimate(); triggeralgs::TriggerCandidate candidate = create_candidate(next_trigger_timestamp); + m_tc_made_count++; TLOG_DEBUG(1) << get_name() << " at timestamp " << m_timestamp_estimator->get_timestamp_estimate() << ", pushing a candidate with timestamp " << candidate.time_candidate; + if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( candidate.time_candidate ); try{ m_trigger_candidate_sink->send(std::move(candidate), std::chrono::milliseconds(10)); m_tc_sent_count++; diff --git a/plugins/RandomTCMakerModule.hpp b/plugins/RandomTCMakerModule.hpp index 882ce2ba..5dc76987 100644 --- a/plugins/RandomTCMakerModule.hpp +++ b/plugins/RandomTCMakerModule.hpp @@ -26,7 +26,9 @@ #include "iomanager/Sender.hpp" #include "utilities/TimestampEstimator.hpp" #include "triggeralgs/TriggerCandidate.hpp" +#include "trigger/Latency.hpp" #include "trigger/opmon/randomtcmaker_info.pb.h" +#include "trigger/opmon/latency_info.pb.h" #include #include @@ -101,6 +103,11 @@ class RandomTCMakerModule : public dunedaq::appfwk::DAQModule std::atomic m_tc_sent_count{ 0 }; std::atomic m_tc_failed_sent_count{ 0 }; void print_opmon_stats(); + + // Create an instance of the Latency class + std::atomic m_latency_monitoring{ false }; + dunedaq::trigger::Latency m_latency_instance; + std::atomic m_latency_out{ 0 }; }; } // namespace trigger } // namespace dunedaq diff --git a/plugins/TriggerDataHandlerModule.cpp b/plugins/TriggerDataHandlerModule.cpp index 52475390..407c08ea 100644 --- a/plugins/TriggerDataHandlerModule.cpp +++ b/plugins/TriggerDataHandlerModule.cpp @@ -69,10 +69,6 @@ TriggerDataHandlerModule::create_readout(const appmodel::DataHandlerModule* modc std::string raw_dt = modconf->get_module_configuration()->get_input_data_type(); TLOG() << "Choosing specializations for DataHandlingModel with data_type:" << raw_dt << ']'; - TLOG() << "modconf: " << modconf; - TLOG() << modconf->class_name(); - TLOG() << modconf->get_module_configuration(); - // IF TriggerPrimitive (TP) if (raw_dt.find("TriggerPrimitive") != std::string::npos) { TLOG(TLVL_WORK_STEPS) << "Creating readout for TriggerPrimitive"; diff --git a/schema/trigger/opmon/latency_info.proto b/schema/trigger/opmon/latency_info.proto new file mode 100644 index 00000000..9e38243e --- /dev/null +++ b/schema/trigger/opmon/latency_info.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package dunedaq.trigger.opmon; + +// Message for latency variables +// Latency represents the difference between current system (clock) time and the data time of particular (TX) data object +// Units are us +// Used by many trigger modules +message TriggerLatency { + uint32 latency_in = 1; + uint32 latency_out = 2; +} + +// Message for latency variables +// Latency represents the difference between current system (clock) time and the data time of particular (TX) data object +// Units are us +// Special case for Standalone makers +message TriggerLatencyStandalone { + uint32 latency_out = 1; +} diff --git a/schema/trigger/opmon/moduleleveltrigger_info.proto b/schema/trigger/opmon/moduleleveltrigger_info.proto index 871366b3..bff25beb 100644 --- a/schema/trigger/opmon/moduleleveltrigger_info.proto +++ b/schema/trigger/opmon/moduleleveltrigger_info.proto @@ -24,3 +24,11 @@ message TriggerDecisionInfo { uint32 paused = 4; // Number of paused (triggers are paused) uint32 inhibited = 5; // Number of inhibited (DFO is busy) } + +// Message for MLT TD requests latency vars +// Latency represents the difference between current system (clock) time and the requested TD readout window (start/end) +// Units are currently us (but use an enum and can be changed) +message ModuleLevelTriggerRequestLatency { + uint32 latency_window_start = 1; + uint32 latency_window_end = 2; +} diff --git a/src/TAProcessor.cpp b/src/TAProcessor.cpp index ec88ab33..8d712749 100644 --- a/src/TAProcessor.cpp +++ b/src/TAProcessor.cpp @@ -54,6 +54,9 @@ TAProcessor::start(const nlohmann::json& args) m_tc_made_count.store(0); m_tc_sent_count.store(0); m_tc_failed_sent_count.store(0); + + m_running_flag.store(true); + inherited::start(args); } @@ -61,6 +64,7 @@ void TAProcessor::stop(const nlohmann::json& args) { inherited::stop(args); + m_running_flag.store(false); print_opmon_stats(); } @@ -94,6 +98,7 @@ TAProcessor::conf(const appmodel::DataHandlerModule* conf) inherited::add_postprocess_task(std::bind(&TAProcessor::find_tc, this, std::placeholders::_1, maker)); m_tcms.push_back(maker); } + m_latency_monitoring.store( dp->get_latency_monitoring() ); inherited::conf(conf); } @@ -108,6 +113,15 @@ TAProcessor::generate_opmon_data() info.set_tc_failed_sent_count( m_tc_failed_sent_count.load() ); this->publish(std::move(info)); + + if ( m_latency_monitoring.load() && m_running_flag.load() ) { + opmon::TriggerLatency lat_info; + + lat_info.set_latency_in( m_latency_instance.get_latency_in() ); + lat_info.set_latency_out( m_latency_instance.get_latency_out() ); + + this->publish(std::move(lat_info)); + } } /** @@ -116,11 +130,14 @@ TAProcessor::generate_opmon_data() void TAProcessor::find_tc(const TAWrapper* ta, std::shared_ptr tca) { + //time_activity gave 0 :/ + if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( ta->activity.time_start ); m_ta_received_count++; std::vector tcs; tca->operator()(ta->activity, tcs); for (auto tc : tcs) { m_tc_made_count++; + if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( tc.time_candidate ); if(!m_tc_sink->try_send(std::move(tc), iomanager::Sender::s_no_block)) { ers::warning(TCDropped(ERS_HERE, tc.time_start, m_sourceid.id)); m_tc_failed_sent_count++; diff --git a/src/TCProcessor.cpp b/src/TCProcessor.cpp index 21ecab10..831305e0 100644 --- a/src/TCProcessor.cpp +++ b/src/TCProcessor.cpp @@ -15,7 +15,6 @@ #include "datahandlinglibs/ReadoutLogging.hpp" #include "datahandlinglibs/models/IterableQueueModel.hpp" #include "datahandlinglibs/utils/ReusableThread.hpp" - #include "trigger/TCWrapper.hpp" #include "triggeralgs/TriggerCandidate.hpp" @@ -121,7 +120,7 @@ TCProcessor::conf(const appmodel::DataHandlerModule* cfg) link->get_sid()}); } - // TODO: Group links! + // TODO: Group links! //m_group_links_data = conf->get_groups_links(); parse_group_links(m_group_links_data); print_group_links(); @@ -177,6 +176,7 @@ TCProcessor::conf(const appmodel::DataHandlerModule* cfg) set_trigger_bitwords(bitwords); print_trigger_bitwords(m_trigger_bitwords); } + m_latency_monitoring.store( dp->get_latency_monitoring() ); inherited::add_postprocess_task(std::bind(&TCProcessor::make_td, this, std::placeholders::_1)); inherited::conf(mtrg); @@ -201,6 +201,15 @@ TCProcessor::generate_opmon_data() info.set_tds_cleared_tc_count( m_tds_cleared_tc_count.load() ); this->publish(std::move(info)); + + if ( m_latency_monitoring.load() && m_running_flag.load() ) { + opmon::TriggerLatency lat_info; + + lat_info.set_latency_in( m_latency_instance.get_latency_in() ); + lat_info.set_latency_out( m_latency_instance.get_latency_out() ); + + this->publish(std::move(lat_info)); + } } /** @@ -211,6 +220,7 @@ TCProcessor::make_td(const TCWrapper* tcw) { auto tc = tcw->candidate; + if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( tc.time_start ); m_tc_received_count++; if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) { @@ -271,7 +281,7 @@ TCProcessor::create_decision(const PendingTD& pending_td) } } else { m_TD_bitword = get_TD_bitword(pending_td); - TLOG(5) << "[MLT] TD has bitword: " << m_TD_bitword << " " + TLOG_DEBUG(5) << "[MLT] TD has bitword: " << m_TD_bitword << " " << static_cast(m_TD_bitword.to_ulong()); decision.trigger_type = static_cast(m_TD_bitword.to_ulong()); // m_trigger_type; @@ -318,7 +328,6 @@ TCProcessor::send_trigger_decisions() { m_cv.wait(lock, [this] { return !m_pending_tds.empty() || !m_running_flag; }); - auto ready_tds = get_ready_tds(m_pending_tds); TLOG_DEBUG(10) << "ready tds: " << ready_tds.size() << ", updated pending tds: " << m_pending_tds.size(); @@ -349,6 +358,7 @@ TCProcessor::call_tc_decision(const TCProcessor::PendingTD& pending_td) auto tn = decision.trigger_number; auto td_ts = decision.trigger_timestamp; + if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( pending_td.contributing_tcs.front().time_start ); if(!m_td_sink->try_send(std::move(decision), iomanager::Sender::s_no_block)) { ers::warning(TDDropped(ERS_HERE, tn, td_ts)); m_tds_dropped_count++; diff --git a/src/TPProcessor.cpp b/src/TPProcessor.cpp index 70fa7c0d..a4b2b3eb 100644 --- a/src/TPProcessor.cpp +++ b/src/TPProcessor.cpp @@ -17,7 +17,6 @@ #include "datahandlinglibs/models/IterableQueueModel.hpp" #include "datahandlinglibs/utils/ReusableThread.hpp" - #include "triggeralgs/TriggerActivity.hpp" #include "trigger/AlgorithmPlugins.hpp" @@ -53,6 +52,8 @@ TPProcessor::start(const nlohmann::json& args) m_ta_sent_count.store(0); m_ta_failed_sent_count.store(0); + m_running_flag.store(true); + inherited::start(args); } @@ -60,6 +61,7 @@ void TPProcessor::stop(const nlohmann::json& args) { inherited::stop(args); + m_running_flag.store(false); print_opmon_stats(); } @@ -97,7 +99,9 @@ TPProcessor::conf(const appmodel::DataHandlerModule* conf) inherited::add_postprocess_task(std::bind(&TPProcessor::find_ta, this, std::placeholders::_1, maker)); m_tams.push_back(maker); } + m_latency_monitoring.store( dp->get_latency_monitoring() ); inherited::conf(conf); + } void @@ -111,6 +115,15 @@ TPProcessor::generate_opmon_data() info.set_ta_failed_sent_count( m_ta_failed_sent_count.load() ); this->publish(std::move(info)); + + if ( m_latency_monitoring.load() && m_running_flag.load() ) { + opmon::TriggerLatency lat_info; + + lat_info.set_latency_in( m_latency_instance.get_latency_in() ); + lat_info.set_latency_out( m_latency_instance.get_latency_out() ); + + this->publish(std::move(lat_info)); + } } /** @@ -119,12 +132,14 @@ TPProcessor::generate_opmon_data() void TPProcessor::find_ta(const TriggerPrimitiveTypeAdapter* tp, std::shared_ptr taa) { + if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( tp->tp.time_start ); // time_start or time_peak ? m_tp_received_count++; std::vector tas; taa->operator()(tp->tp, tas); while (tas.size()) { m_ta_made_count++; + if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( tas.back().time_start ); if (!m_ta_sink->try_send(std::move(tas.back()), iomanager::Sender::s_no_block)) { ers::warning(TADropped(ERS_HERE, tp->tp.time_start, m_sourceid.id)); m_ta_failed_sent_count++; diff --git a/src/trigger/HSISourceModel.hpp b/src/trigger/HSISourceModel.hpp index c93d38dd..1a54b75e 100644 --- a/src/trigger/HSISourceModel.hpp +++ b/src/trigger/HSISourceModel.hpp @@ -14,7 +14,9 @@ #include "dfmessages/HSIEvent.hpp" #include "triggeralgs/TriggerCandidate.hpp" #include "trigger/Issues.hpp" +#include "trigger/Latency.hpp" #include "trigger/opmon/hsisourcemodel_info.pb.h" +#include "trigger/opmon/latency_info.pb.h" #include "iomanager/IOManager.hpp" #include "iomanager/Sender.hpp" @@ -24,6 +26,7 @@ #include "appmodel/DataSubscriberModule.hpp" #include "appmodel/HSI2TCTranslatorConf.hpp" #include "appmodel/HSISignalWindow.hpp" +#include "appmodel/DataProcessor.hpp" namespace dunedaq::trigger { @@ -97,11 +100,15 @@ class HSISourceModel : public datahandlinglibs::SourceConcept } m_prescale = hsi_conf->get_prescale(); + m_latency_monitoring.store( hsi_conf->get_latency_monitoring() ); + } void start() { m_data_receiver->add_callback(std::bind(&HSISourceModel::handle_payload, this, std::placeholders::_1)); + m_running_flag.store(true); + m_received_events_count.store(0); m_tcs_made_count.store(0); m_tcs_sent_count.store(0); @@ -110,12 +117,14 @@ class HSISourceModel : public datahandlinglibs::SourceConcept void stop() { m_data_receiver->remove_callback(); + m_running_flag.store(false); print_opmon_stats(); } bool handle_payload(dfmessages::HSIEvent& data) // NOLINT(build/unsigned) { m_received_events_count++; + if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( data.timestamp ); // Prescale after n-hsi received if (m_received_events_count % m_prescale != 0) { @@ -148,6 +157,7 @@ class HSISourceModel : public datahandlinglibs::SourceConcept candidate.inputs = {}; m_tcs_made_count++; + if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( candidate.time_candidate ); // Send the TC if (!m_data_sender->try_send(std::move(candidate), iomanager::Sender::s_no_block)) { m_tcs_dropped_count++; @@ -173,6 +183,15 @@ class HSISourceModel : public datahandlinglibs::SourceConcept info.set_tcs_dropped_count( m_tcs_dropped_count ); this->publish(std::move(info)); + + if ( m_latency_monitoring.load() && m_running_flag.load() ) { + opmon::TriggerLatency lat_info; + + lat_info.set_latency_in( m_latency_instance.get_latency_in() ); + lat_info.set_latency_out( m_latency_instance.get_latency_out() ); + + this->publish(std::move(lat_info)); + } } void print_opmon_stats() @@ -197,13 +216,21 @@ class HSISourceModel : public datahandlinglibs::SourceConcept std::map m_signals; //Stats - std::atomic m_received_events_count{0}; - std::atomic m_tcs_made_count{0}; - std::atomic m_tcs_sent_count{0}; - std::atomic m_tcs_dropped_count{0}; + using metric_counter_type = uint64_t; + std::atomic m_received_events_count{0}; + std::atomic m_tcs_made_count{0}; + std::atomic m_tcs_sent_count{0}; + std::atomic m_tcs_dropped_count{0}; /// @brief {rescale for the input HSIEvents, default 1 uint64_t m_prescale; + + // Create an instance of the Latency class + std::atomic m_running_flag{ false }; + std::atomic m_latency_monitoring{ false }; + dunedaq::trigger::Latency m_latency_instance; + std::atomic m_latency_in{ 0 }; + std::atomic m_latency_out{ 0 }; }; } // namespace dunedaq::trigger diff --git a/src/trigger/TAProcessor.hpp b/src/trigger/TAProcessor.hpp index 33d91cee..98df3c1e 100644 --- a/src/trigger/TAProcessor.hpp +++ b/src/trigger/TAProcessor.hpp @@ -18,9 +18,11 @@ #include "trigger/Issues.hpp" #include "trigger/TAWrapper.hpp" +#include "trigger/Latency.hpp" #include "trigger/opmon/taprocessor_info.pb.h" -#include "triggeralgs/TriggerCandidate.hpp" +#include "trigger/opmon/latency_info.pb.h" +#include "triggeralgs/TriggerCandidate.hpp" #include "triggeralgs/Types.hpp" #include "triggeralgs/TriggerCandidateMaker.hpp" @@ -67,11 +69,20 @@ class TAProcessor : public datahandlinglibs::TaskRawDataProcessorModel m_ta_received_count{ 0 }; // NOLINT(build/unsigned) - std::atomic m_tc_made_count{ 0 }; - std::atomic m_tc_sent_count{ 0 }; - std::atomic m_tc_failed_sent_count{ 0 }; + using metric_counter_type = uint64_t; + std::atomic m_ta_received_count{ 0 }; // NOLINT(build/unsigned) + std::atomic m_tc_made_count{ 0 }; + std::atomic m_tc_sent_count{ 0 }; + std::atomic m_tc_failed_sent_count{ 0 }; void print_opmon_stats(); + + // Create an instance of the Latency class + std::atomic m_running_flag{ false }; + std::atomic m_latency_monitoring{ false }; + dunedaq::trigger::Latency m_latency_instance; + std::atomic m_latency_in{ 0 }; + std::atomic m_latency_out{ 0 }; + }; } // namespace trigger diff --git a/src/trigger/TCProcessor.hpp b/src/trigger/TCProcessor.hpp index 83335b15..d92f62c3 100644 --- a/src/trigger/TCProcessor.hpp +++ b/src/trigger/TCProcessor.hpp @@ -21,7 +21,9 @@ #include "trigger/Issues.hpp" #include "trigger/TCWrapper.hpp" +#include "trigger/Latency.hpp" #include "trigger/opmon/tcprocessor_info.pb.h" +#include "trigger/opmon/latency_info.pb.h" #include "daqdataformats/SourceID.hpp" #include "dfmessages/TriggerDecision.hpp" @@ -187,6 +189,12 @@ class TCProcessor : public datahandlinglibs::TaskRawDataProcessorModel m_tds_cleared_tc_count{ 0 }; std::atomic m_tc_ignored_count{ 0 }; + // latency + std::atomic m_latency_monitoring{ false }; + dunedaq::trigger::Latency m_latency_instance; + std::atomic m_latency_in{ 0 }; + std::atomic m_latency_out{ 0 }; + void print_opmon_stats(); }; diff --git a/src/trigger/TPProcessor.hpp b/src/trigger/TPProcessor.hpp index 63f429b1..5eff3adb 100644 --- a/src/trigger/TPProcessor.hpp +++ b/src/trigger/TPProcessor.hpp @@ -16,9 +16,11 @@ //#include "triggger/Issues.hpp" #include "trigger/TriggerPrimitiveTypeAdapter.hpp" +#include "trigger/Latency.hpp" #include "trigger/opmon/tpprocessor_info.pb.h" -#include "triggeralgs/TriggerActivity.hpp" +#include "trigger/opmon/latency_info.pb.h" +#include "triggeralgs/TriggerActivity.hpp" #include "triggeralgs/Types.hpp" #include "triggeralgs/TriggerActivityMaker.hpp" @@ -67,11 +69,20 @@ class TPProcessor : public datahandlinglibs::TaskRawDataProcessorModel m_tp_received_count{ 0 }; // NOLINT(build/unsigned) - std::atomic m_ta_made_count{ 0 }; - std::atomic m_ta_sent_count{ 0 }; - std::atomic m_ta_failed_sent_count{ 0 }; + using metric_counter_type = uint64_t; + std::atomic m_tp_received_count{ 0 }; // NOLINT(build/unsigned) + std::atomic m_ta_made_count{ 0 }; + std::atomic m_ta_sent_count{ 0 }; + std::atomic m_ta_failed_sent_count{ 0 }; void print_opmon_stats(); + + // Create an instance of the Latency class + std::atomic m_running_flag{ false }; + std::atomic m_latency_monitoring{ false }; + dunedaq::trigger::Latency m_latency_instance; + std::atomic m_latency_in{ 0 }; + std::atomic m_latency_out{ 0 }; + }; } // namespace trigger