From ba8938c53449ca7b926587f4787b53f9173d5df5 Mon Sep 17 00:00:00 2001 From: William Panduro Vazquez Date: Fri, 22 Jul 2022 21:33:14 +0200 Subject: [PATCH] fix integration tests, some compiler warnings, incorrect timestamp measurement and upgrade some logging --- CMakeLists.txt | 2 +- integtest/test_pacman-raw.py | 6 +- integtest/test_pacman.py | 6 +- plugins/PacmanCardReader.cpp | 22 +- python/lbrulibs/fake_NDreadout.json | 490 ---------------------------- src/STREAMLinkConcept.hpp | 2 +- src/STREAMLinkModel.hpp | 27 +- src/ZMQLinkConcept.hpp | 2 +- src/ZMQLinkModel.hpp | 2 +- 9 files changed, 45 insertions(+), 514 deletions(-) delete mode 100644 python/lbrulibs/fake_NDreadout.json diff --git a/CMakeLists.txt b/CMakeLists.txt index 794d44d..7e3aa0f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.12) -project(lbrulibs VERSION 1.2.0) +project(lbrulibs VERSION 1.2.1) find_package(daq-cmake REQUIRED) diff --git a/integtest/test_pacman-raw.py b/integtest/test_pacman-raw.py index 04cd553..582b525 100644 --- a/integtest/test_pacman-raw.py +++ b/integtest/test_pacman-raw.py @@ -27,9 +27,9 @@ confgen_name="daqconf_multiru_gen" # The arguments to pass to the config generator, excluding the json # output directory (the test framework handles that) -confgen_arguments=[ "--host-ru", "localhost", "-o", ".", "-n", str(number_of_data_producers), "--frontend-type", "pacman", "-b", "2500000", "-a", "2500000", "-t", "1.0" ] +confgen_arguments=[ "--host-ru", "localhost", "-o", ".", "-n", str(number_of_data_producers), "--frontend-type", "pacman", "-b", "2500000", "-a", "2500000", "-t", "1.0", "--op-env", "integtest" ] # The commands to run in nanorc, as a list -nanorc_command_list="boot integration-test init conf start --resume-wait 1 101 wait ".split() + [str(run_duration)] + " stop --stop-wait 1 wait 2 scrap terminate".split() +nanorc_command_list="integtest-partition boot conf start 101 wait 1 enable_triggers wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2 scrap terminate".split() # The tests themselves def test_nanorc_success(run_nanorc): @@ -57,7 +57,7 @@ def test_data_file(run_nanorc): data_socket = 'tcp://127.0.0.1:5556' #data_file = '../test/example-pacman-data.h5' -data_file = '/dune/data/users/srsoleti/full_spill_larndsim/neutrino.0_1634786172.larndsim.h5' +data_file = '/nfs/home/jpanduro/dunedaq-v3.1.0-1/sourcecode/lbrulibs/test/example-pacman-data.h5' def hdf5ToPackets(datafile): print("Reading from:",datafile) diff --git a/integtest/test_pacman.py b/integtest/test_pacman.py index d7f8a95..08a6894 100644 --- a/integtest/test_pacman.py +++ b/integtest/test_pacman.py @@ -24,12 +24,12 @@ # file. They're read by the "fixtures" in conftest.py to determine how # to run the config generation and nanorc # The name of the python module for the config generation -confgen_name="daqconf__multiru_gen" +confgen_name="daqconf_multiru_gen" # The arguments to pass to the config generator, excluding the json # output directory (the test framework handles that) confgen_arguments=[ "--host-ru", "localhost", "-o", ".", "-n", str(number_of_data_producers), "--frontend-type", "pacman", "-b", "2500000", "-a", "2500000", "-t", "1.0" ] # The commands to run in nanorc, as a list -nanorc_command_list="boot integration-test init conf start --resume-wait 1 101 wait ".split() + [str(run_duration)] + " stop --stop-wait 1 wait 2 scrap terminate".split() +nanorc_command_list="integtest-partition boot conf start 101 wait 1 enable_triggers wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2 scrap terminate".split() # The tests themselves def test_nanorc_success(run_nanorc): @@ -57,7 +57,7 @@ def test_data_file(run_nanorc): data_socket = 'tcp://127.0.0.1:5556' #data_file = '../test/example-pacman-data.h5' -data_file = '/dune/data/users/srsoleti/full_spill_larndsim/neutrino.0_1634786172.larndsim.h5' +data_file = '/nfs/home/jpanduro/dunedaq-v3.1.0-1/sourcecode/lbrulibs/test/example-pacman-data.h5' def hdf5ToPackets(datafile): print("Reading from:",datafile) diff --git a/plugins/PacmanCardReader.cpp b/plugins/PacmanCardReader.cpp index 013756c..e8518b9 100644 --- a/plugins/PacmanCardReader.cpp +++ b/plugins/PacmanCardReader.cpp @@ -29,13 +29,13 @@ bool usePUBSUB = 0; /** * @brief TRACE debug levels used in this source file */ -enum +/*enum { TLVL_ENTER_EXIT_METHODS = 5, TLVL_WORK_STEPS = 10, TLVL_BOOKKEEPING = 15 }; - +*/ namespace dunedaq { namespace lbrulibs { @@ -66,25 +66,27 @@ void PacmanCardReader::init(const data_t& args) { auto ini = args.get(); + TLOG(TLVL_WORK_STEPS) << "ini"; for (const auto& cr : ini.conn_refs) { if (cr.dir != iomanager::connection::Direction::kOutput) { // ers::error("Only output queues are supported in this module!"); + TLOG(TLVL_WORK_STEPS) << "PacmanCardReader??? "; continue; } else { - TLOG_DEBUG(TLVL_WORK_STEPS) << "PacmanCardReader output queue is " << cr.uid; + TLOG(TLVL_WORK_STEPS) << "PacmanCardReader output queue is " << cr.uid; const char delim = '_'; std::string target = cr.uid; std::vector words; tokenize(target, delim, words); if (usePUBSUB) { - TLOG_DEBUG(TLVL_WORK_STEPS) << "Creating ZMQLinkModel for target queue: " << target; + TLOG(TLVL_WORK_STEPS) << "Creating ZMQLinkModel for target queue: " << target; m_zmqlink[0] = createZMQLinkModel(cr.uid); // FIX ME - need to resolve proper link ID rather than hard code to zero if (m_zmqlink[0] == nullptr) { ers::fatal(InitializationError(ERS_HERE, "CreateZMQLink failed to provide an appropriate model for queue!")); } m_zmqlink[0]->init(args, m_queue_capacity); } else { - TLOG_DEBUG(TLVL_WORK_STEPS) << "Creating STREAMLinkModel for target queue: " << target; + TLOG(TLVL_WORK_STEPS) << "Creating STREAMLinkModel for target queue: " << target; m_streamlink[0] = createSTREAMLinkModel(cr.uid); // FIX ME - need to resolve proper link ID rather than hard code to zero if (m_streamlink[0] == nullptr) { ers::fatal(InitializationError(ERS_HERE, "CreateSTREAMLink failed to provide an appropriate model for queue!")); @@ -93,6 +95,10 @@ PacmanCardReader::init(const data_t& args) } } } + + + m_cfg = args.get(); + } void @@ -109,13 +115,17 @@ PacmanCardReader::do_configure(const data_t& args) // // Configure components - TLOG(TLVL_WORK_STEPS) << "Configuring ZMQLinkHandler"; + TLOG(TLVL_WORK_STEPS) << "Configuring LinkHandler"; if (usePUBSUB) { + TLOG(TLVL_WORK_STEPS) << "Using ZMQ Publish/Subscribe"; m_zmqlink[0]->set_ids(m_card_id, 0); m_zmqlink[0]->conf(args); } else { + TLOG(TLVL_WORK_STEPS) << "Using Raw TCP Stream"; m_streamlink[0]->set_ids(m_card_id,0); + TLOG(TLVL_WORK_STEPS) << "apply conf"; m_streamlink[0]->conf(args); + TLOG(TLVL_WORK_STEPS) << "finish conf"; } } diff --git a/python/lbrulibs/fake_NDreadout.json b/python/lbrulibs/fake_NDreadout.json deleted file mode 100644 index bf909db..0000000 --- a/python/lbrulibs/fake_NDreadout.json +++ /dev/null @@ -1,490 +0,0 @@ -[ - { - "data": { - "modules": [ - { - "data": { - "qinfos": [ - { - "dir": "output", - "inst": "pacman_link_0", - "name": "output_0" - } - ] - }, - "inst": "fake_source", - "plugin": "PacmanCardReader" - }, - { - "data": { - "qinfos": [ - { - "dir": "input", - "inst": "pacman_link_0", - "name": "raw_input" - }, - { - "dir": "output", - "inst": "time_sync_q", - "name": "timesync" - }, - { - "dir": "input", - "inst": "data_requests_0", - "name": "data_requests_0" - }, - { - "dir": "output", - "inst": "data_fragments_q", - "name": "fragment_queue" - }, - { - "dir": "output", - "inst": "sw_tp_queue_0", - "name": "tp_out" - }, - { - "dir": "output", - "inst": "tpset_link_0", - "name": "tpset_out" - }, - { - "dir": "output", - "inst": "errored_frames_q", - "name": "errored_frames" - } - ] - }, - "inst": "datahandler_0", - "plugin": "DataLinkHandler" - }, - { - "data": { - "qinfos": [ - { - "dir": "input", - "inst": "time_sync_q", - "name": "input_queue" - } - ] - }, - "inst": "timesync_consumer", - "plugin": "TimeSyncConsumer" - }, - { - "data": { - "qinfos": [ - { - "dir": "input", - "inst": "data_fragments_q", - "name": "input_queue" - } - ] - }, - "inst": "fragment_consumer", - "plugin": "FragmentConsumer" - }, - { - "data": { - "qinfos": [ - { - "dir": "input", - "inst": "sw_tp_queue_0", - "name": "raw_input" - }, - { - "dir": "output", - "inst": "time_sync_q", - "name": "timesync" - }, - { - "dir": "input", - "inst": "tp_data_requests", - "name": "requests" - }, - { - "dir": "output", - "inst": "data_fragments_q", - "name": "fragment_queue" - } - ] - }, - "inst": "sw_tp_handler_0", - "plugin": "DataLinkHandler" - }, - { - "data": { - "qinfos": [ - { - "dir": "input", - "inst": "tpset_link_0", - "name": "input" - } - ] - }, - "inst": "tpset_publisher_0", - "plugin": "QueueToNetwork" - }, - { - "data": { - "qinfos": [ - { - "dir": "input", - "inst": "errored_frames_q", - "name": "input_queue" - } - ] - }, - "inst": "errored_frame_consumer", - "plugin": "ErroredFrameConsumer" - } - ], - "nwconnections": [ - { - "address": "tcp://127.0.0.1:5000", - "name": "tpsets_0", - "topics": [ - "foo" - ] - }, - { - "address": "tcp://127.0.0.1:6000", - "name": "timesync", - "topics": [ - "Timesync" - ] - } - ], - "queues": [ - { - "capacity": 100, - "inst": "data_fragments_q", - "kind": "FollyMPMCQueue" - }, - { - "capacity": 1000, - "inst": "data_requests_0", - "kind": "FollySPSCQueue" - }, - { - "capacity": 10000, - "inst": "errored_frames_q", - "kind": "FollyMPMCQueue" - }, - { - "capacity": 100000, - "inst": "pacman_link_0", - "kind": "FollySPSCQueue" - }, - { - "capacity": 100000, - "inst": "sw_tp_queue_0", - "kind": "FollySPSCQueue" - }, - { - "capacity": 100, - "inst": "time_sync_q", - "kind": "FollyMPMCQueue" - }, - { - "capacity": 1000, - "inst": "tp_data_requests", - "kind": "FollySPSCQueue" - }, - { - "capacity": 10000, - "inst": "tpset_link_0", - "kind": "FollySPSCQueue" - } - ] - }, - "entry_state": "NONE", - "exit_state": "INITIAL", - "id": "init" - }, - { - "data": { - "modules": [ - { - "data": { - "card_id": 0, - "link_confs": [ - { - "geoid": { - "element": 0, - "region": 0, - "system": "kNDLarTPC" - } - } - ], - "zmq_receiver_timeout": 0 - }, - "match": "fake_source" - }, - { - "data": { - "latencybufferconf": { - "element_id": 0, - "latency_buffer_alignment_size": 0, - "latency_buffer_intrinsic_allocator": false, - "latency_buffer_numa_aware": false, - "latency_buffer_numa_node": 0, - "latency_buffer_preallocation": false, - "latency_buffer_size": 50000, - "region_id": 0 - }, - "rawdataprocessorconf": { - "channel_map_felix": "", - "channel_map_name": "None", - "channel_map_rce": "", - "element_id": 0, - "emulator_mode": false, - "enable_software_tpg": false, - "error_counter_threshold": 100, - "error_reset_freq": 10000, - "postprocess_queue_sizes": 10000, - "region_id": 0, - "tp_timeout": 100000, - "tpset_window_size": 10000 - }, - "readoutmodelconf": { - "element_id": 0, - "fake_trigger_flag": 1, - "region_id": 0, - "source_queue_timeout_ms": 100, - "timesync_connection_name": "timesync", - "timesync_topic_name": "Timesync" - }, - "requesthandlerconf": { - "compression_algorithm": "None", - "element_id": 0, - "enable_raw_recording": true, - "fragment_queue_timeout_ms": 100, - "latency_buffer_size": 50000, - "num_request_handling_threads": 4, - "output_file": "output_0.out", - "pop_limit_pct": 0.8, - "pop_size_pct": 0.1, - "region_id": 0, - "retry_count": 100, - "stream_buffer_size": 8388608, - "use_o_direct": false - } - }, - "match": "datahandler_0" - }, - { - "data": { - "latencybufferconf": { - "element_id": 0, - "latency_buffer_alignment_size": 0, - "latency_buffer_intrinsic_allocator": false, - "latency_buffer_numa_aware": false, - "latency_buffer_numa_node": 0, - "latency_buffer_preallocation": false, - "latency_buffer_size": 50000, - "region_id": 0 - }, - "rawdataprocessorconf": { - "channel_map_felix": "", - "channel_map_name": "None", - "channel_map_rce": "", - "element_id": 0, - "emulator_mode": false, - "enable_software_tpg": false, - "error_counter_threshold": 100, - "error_reset_freq": 10000, - "postprocess_queue_sizes": 10000, - "region_id": 0, - "tp_timeout": 100000, - "tpset_window_size": 10000 - }, - "readoutmodelconf": { - "element_id": 0, - "fake_trigger_flag": 1, - "region_id": 0, - "source_queue_timeout_ms": 100, - "timesync_connection_name": "", - "timesync_topic_name": "Timesync" - }, - "requesthandlerconf": { - "compression_algorithm": "None", - "element_id": 0, - "enable_raw_recording": false, - "fragment_queue_timeout_ms": 100, - "latency_buffer_size": 50000, - "num_request_handling_threads": 4, - "output_file": "output_0.out", - "pop_limit_pct": 0.8, - "pop_size_pct": 0.1, - "region_id": 0, - "retry_count": 100, - "stream_buffer_size": 8388608, - "use_o_direct": false - } - }, - "match": "sw_tp_handler_0" - }, - { - "data": { - "msg_module_name": "TPSetNQ", - "msg_type": "dunedaq::trigger::TPSet", - "sender_config": { - "name": "tpsets_0", - "stype": "msgpack", - "topic": "foo" - } - }, - "match": "tpset_publisher_0" - } - ] - }, - "entry_state": "INITIAL", - "exit_state": "CONFIGURED", - "id": "conf" - }, - { - "data": { - "modules": [ - { - "data": { - "disable_data_storage": false, - "run": 333, - "trigger_interval_ticks": 64000000 - }, - "match": "datahandler_.*" - }, - { - "data": { - "disable_data_storage": false, - "run": 333, - "trigger_interval_ticks": 64000000 - }, - "match": "fake_source" - }, - { - "data": { - "disable_data_storage": false, - "run": 333, - "trigger_interval_ticks": 64000000 - }, - "match": "data_recorder_.*" - }, - { - "data": { - "disable_data_storage": false, - "run": 333, - "trigger_interval_ticks": 64000000 - }, - "match": "timesync_consumer" - }, - { - "data": { - "disable_data_storage": false, - "run": 333, - "trigger_interval_ticks": 64000000 - }, - "match": "fragment_consumer" - }, - { - "data": { - "disable_data_storage": false, - "run": 333, - "trigger_interval_ticks": 64000000 - }, - "match": "sw_tp_handler_.*" - }, - { - "data": { - "disable_data_storage": false, - "run": 333, - "trigger_interval_ticks": 64000000 - }, - "match": "raw_tp_handler_.*" - }, - { - "data": { - "disable_data_storage": false, - "run": 333, - "trigger_interval_ticks": 64000000 - }, - "match": "tpset_publisher_.*" - }, - { - "data": { - "disable_data_storage": false, - "run": 333, - "trigger_interval_ticks": 64000000 - }, - "match": "errored_frame_consumer" - } - ] - }, - "entry_state": "CONFIGURED", - "exit_state": "RUNNING", - "id": "start" - }, - { - "data": { - "modules": [ - { - "match": "fake_source" - }, - { - "match": "datahandler_.*" - }, - { - "match": "data_recorder_.*" - }, - { - "match": "timesync_consumer" - }, - { - "match": "fragment_consumer" - }, - { - "match": "sw_tp_handler_.*" - }, - { - "match": "raw_tp_handler_.*" - }, - { - "match": "tpset_publisher_.*" - }, - { - "match": "errored_frame_consumer" - } - ] - }, - "entry_state": "RUNNING", - "exit_state": "CONFIGURED", - "id": "stop" - }, - { - "data": { - "modules": [ - { - "match": "" - } - ] - }, - "entry_state": "CONFIGURED", - "exit_state": "INITIAL", - "id": "scrap" - }, - { - "data": { - "modules": [ - { - "data": { - "duration": 10 - }, - "match": "datahandler_.*" - } - ] - }, - "entry_state": "RUNNING", - "exit_state": "RUNNING", - "id": "record" - } -] \ No newline at end of file diff --git a/src/STREAMLinkConcept.hpp b/src/STREAMLinkConcept.hpp index 6330251..4e1f77d 100644 --- a/src/STREAMLinkConcept.hpp +++ b/src/STREAMLinkConcept.hpp @@ -27,7 +27,7 @@ class STREAMLinkConcept { : m_card_id(0) , m_link_tag(0) {} - ~STREAMLinkConcept() {} + virtual ~STREAMLinkConcept() {} STREAMLinkConcept(const STREAMLinkConcept&) = delete; ///< STREAMLinkConcept is not copy-constructible diff --git a/src/STREAMLinkModel.hpp b/src/STREAMLinkModel.hpp index 4bd64d9..c5ea62c 100644 --- a/src/STREAMLinkModel.hpp +++ b/src/STREAMLinkModel.hpp @@ -27,6 +27,18 @@ #include #include + +/** + * @brief TRACE debug levels used in this source file + */ +enum + { + TLVL_ENTER_EXIT_METHODS = 5, + TLVL_WORK_STEPS = 10, + TLVL_BOOKKEEPING = 15 + }; + + namespace dunedaq::lbrulibs { template @@ -48,7 +60,7 @@ class STREAMLinkModel : public STREAMLinkConcept { void set_sink(const std::string& sink_name) override { if (m_sink_is_set) { - TLOG_DEBUG(5) << "STREAMLinkModel sink is already set and initialized!"; + TLOG(TLVL_WORK_STEPS) << "STREAMLinkModel sink is already set and initialized!"; } else { m_sink_queue = get_iom_sender(sink_name); m_sink_is_set = true; @@ -65,22 +77,21 @@ class STREAMLinkModel : public STREAMLinkConcept { void conf(const data_t& args) { if (m_configured) { - TLOG_DEBUG(5) << "STREAMLinkModel is already configured!"; + TLOG(TLVL_WORK_STEPS) << "STREAMLinkModel is already configured!"; } else { m_cfg = args.get(); - + TLOG(TLVL_WORK_STEPS) << "Configuring STREAMLinkModel!"; m_queue_timeout = std::chrono::milliseconds(m_cfg.zmq_receiver_timeout); - TLOG_DEBUG(5) << "STREAMLinkModel conf: initialising subscriber!"; + TLOG(TLVL_WORK_STEPS) << "STREAMLinkModel conf: initialising subscriber!"; m_subscriber_connected = false; //m_subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); - TLOG_DEBUG(5) << "STREAMLinkModel conf: connecting subscriber!"; + TLOG(TLVL_WORK_STEPS) << "STREAMLinkModel conf: connecting subscriber!"; m_subscriber.bind(m_STREAMLink_sourceLink); m_subscriber_connected = true; - TLOG_DEBUG(5) << "STREAMLinkModel conf: enacting subscription!"; + TLOG(TLVL_WORK_STEPS) << "STREAMLinkModel conf: set parser thread name!"; //m_subscriber.setsockopt(ZMQ_SUBSCRIBE, ""); - TLOG_DEBUG(5) << "Configuring STREAMLinkModel!"; - + m_parser_thread.set_name(m_STREAMLink_sourceLink, m_link_tag); m_configured=true; } diff --git a/src/ZMQLinkConcept.hpp b/src/ZMQLinkConcept.hpp index 3bb11fa..c65f084 100644 --- a/src/ZMQLinkConcept.hpp +++ b/src/ZMQLinkConcept.hpp @@ -27,7 +27,7 @@ class ZMQLinkConcept { : m_card_id(0) , m_link_tag(0) {} - ~ZMQLinkConcept() {} + virtual ~ZMQLinkConcept() {} ZMQLinkConcept(const ZMQLinkConcept&) = delete; ///< ZMQLinkConcept is not copy-constructible diff --git a/src/ZMQLinkModel.hpp b/src/ZMQLinkModel.hpp index 5ca1751..76819de 100644 --- a/src/ZMQLinkModel.hpp +++ b/src/ZMQLinkModel.hpp @@ -205,8 +205,8 @@ class ZMQLinkModel : public ZMQLinkConcept { TLOG_DEBUG(1) << ": Pushing data into output_queue"; try { TargetPayloadType* Payload = new TargetPayloadType(); - m_timestamp = Payload->get_timestamp(); std::memcpy(static_cast(&Payload->data), msg.data(), msg.size()); + m_timestamp = Payload->get_timestamp(); m_sink_queue->send(std::move(*Payload), m_sink_timeout); m_packetsizesum += msg.size(); //sum of data from packets m_packetsize = msg.size(); //last packet size