diff --git a/src/TCProcessor.cpp b/src/TCProcessor.cpp index 4ef96e41..21ecab10 100644 --- a/src/TCProcessor.cpp +++ b/src/TCProcessor.cpp @@ -68,6 +68,14 @@ TCProcessor::stop(const nlohmann::json& args) { inherited::stop(args); m_running_flag.store(false); + + // Make sure condition_variable knows we flipped running flag + { + std::lock_guard lock(m_td_vector_mutex); + m_cv.notify_all(); + } + + // Wait for the TD-sending thread to stop m_send_trigger_decisions_thread.join(); // Drop all TDs in vectors at run stage change. Have to do this @@ -113,7 +121,6 @@ TCProcessor::conf(const appmodel::DataHandlerModule* cfg) link->get_sid()}); } - // TODO: Group links! //m_group_links_data = conf->get_groups_links(); parse_group_links(m_group_links_data); @@ -233,6 +240,7 @@ TCProcessor::make_td(const TCWrapper* tcw) else { std::lock_guard lock(m_td_vector_mutex); add_tc(tc); + m_cv.notify_one(); TLOG_DEBUG(10) << "pending tds size: " << m_pending_tds.size(); } return; @@ -302,9 +310,15 @@ TCProcessor::create_decision(const PendingTD& pending_td) void TCProcessor::send_trigger_decisions() { + // A unique lock that can be locked and unlocked + std::unique_lock lock(m_td_vector_mutex); while (m_running_flag) { - std::lock_guard lock(m_td_vector_mutex); + // Either there are pending TDs, or wait for a bit + m_cv.wait(lock, [this] { + return !m_pending_tds.empty() || !m_running_flag; + }); + auto ready_tds = get_ready_tds(m_pending_tds); TLOG_DEBUG(10) << "ready tds: " << ready_tds.size() << ", updated pending tds: " << m_pending_tds.size(); diff --git a/src/trigger/TCProcessor.hpp b/src/trigger/TCProcessor.hpp index b2975eae..83335b15 100644 --- a/src/trigger/TCProcessor.hpp +++ b/src/trigger/TCProcessor.hpp @@ -119,6 +119,7 @@ class TCProcessor : public datahandlinglibs::TaskRawDataProcessorModel m_pending_tds; std::mutex m_td_vector_mutex; + std::condition_variable m_cv; void add_tc(const triggeralgs::TriggerCandidate tc); void add_tc_ignored(const triggeralgs::TriggerCandidate tc);