From 9a22d2b4dd7eb20f8e70f904ee2133dafd94faaa Mon Sep 17 00:00:00 2001 From: Pam Harris Date: Wed, 23 Oct 2024 16:07:45 -0600 Subject: [PATCH] Support channel map flow message --- src/Session/Session.cc | 14 +++++++++++++- src/Session/Session.h | 3 +++ src/Session/SessionManager.cc | 8 ++++++++ src/Util/Message.cc | 7 +++++++ src/Util/Message.h | 2 ++ 5 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/Session/Session.cc b/src/Session/Session.cc index 6de431557..64e1c531e 100644 --- a/src/Session/Session.cc +++ b/src/Session/Session.cc @@ -768,7 +768,7 @@ void Session::OnSetImageChannels(const CARTA::SetImageChannels& message) { int num_channel(frame->Depth()); // Use animation limits for flow control - int max_frame_rate(15), max_gap(max_frame_rate / 3); + int max_frame_rate(15), max_channel_gap(max_frame_rate / CARTA::InitialAnimationWaitsPerSecond); std::chrono::microseconds channel_interval(int64_t(1.0e6 / max_frame_rate)); for (int chan = start_channel; chan <= end_channel; ++chan) { @@ -780,6 +780,14 @@ void Session::OnSetImageChannels(const CARTA::SetImageChannels& message) { continue; } + if (_channel_map_received_channel.find(file_id) != _channel_map_received_channel.end()) { + int received_channel = _channel_map_received_channel[file_id]; + while (chan - received_channel > max_channel_gap) { + std::this_thread::sleep_for(channel_interval); + received_channel = _channel_map_received_channel[file_id]; + } + } + auto start_time = std::chrono::high_resolution_clock::now(); spdlog::debug("Send channel {} in range {}-{}", chan, start_channel, end_channel); OnAddRequiredTiles(message.required_tiles(), chan); @@ -2580,3 +2588,7 @@ bool Session::IsInChannelMapTiles(int file_id, int tile) { // Check if tile is in current channel map tiles for file id. return _channel_map_settings && _channel_map_settings->HasTile(file_id, tile); } + +void Session::HandleChannelMapFlowControlEvt(CARTA::ChannelMapFlowControl& message) { + _channel_map_received_channel[message.file_id()] = message.received_channel(); +} diff --git a/src/Session/Session.h b/src/Session/Session.h index 370063aff..8fa911958 100644 --- a/src/Session/Session.h +++ b/src/Session/Session.h @@ -104,6 +104,8 @@ class Session { void ExecuteSetChannelEvt(std::pair request) { OnSetImageChannels(request.first); } + void HandleChannelMapFlowControlEvt(CARTA::ChannelMapFlowControl& message); + void CancelSetHistRequirements() { _histogram_context.cancel_group_execution(); } @@ -307,6 +309,7 @@ class Session { std::unordered_map _image_channel_mutexes; std::unordered_map _image_channel_task_active; std::unique_ptr _channel_map_settings; + std::unordered_map _channel_map_received_channel; // Cube histogram progress: 0.0 to 1.0 (complete) float _histogram_progress; diff --git a/src/Session/SessionManager.cc b/src/Session/SessionManager.cc index c87e453ce..cf395e1e2 100644 --- a/src/Session/SessionManager.cc +++ b/src/Session/SessionManager.cc @@ -518,6 +518,14 @@ void SessionManager::OnMessage(WSType* ws, std::string_view sv_message, uWS::OpC } break; } + case CARTA::EventType::CHANNEL_MAP_FLOW_CONTROL: { + CARTA::ChannelMapFlowControl message; + if (message.ParseFromArray(event_buf, event_length)) { + session->HandleChannelMapFlowControlEvt(message); + message_parsed = true; + } + break; + } default: { spdlog::warn("Bad event type {}!", event_type); break; diff --git a/src/Util/Message.cc b/src/Util/Message.cc index 5a9124471..93927eb43 100644 --- a/src/Util/Message.cc +++ b/src/Util/Message.cc @@ -440,6 +440,13 @@ CARTA::ScriptingRequest Message::ScriptingRequest(uint32_t scripting_request_id, return message; } +CARTA::ChannelMapFlowControl Message::ChannelMapFlowControl(int32_t file_id, int32_t received_channel) { + CARTA::ChannelMapFlowControl message; + message.set_file_id(file_id); + message.set_received_channel(received_channel); + return message; +} + CARTA::EventType Message::EventType(std::vector& message) { carta::EventHeader head = *reinterpret_cast(message.data()); return static_cast(head.type); diff --git a/src/Util/Message.h b/src/Util/Message.h index f02e2d095..a96b71996 100644 --- a/src/Util/Message.h +++ b/src/Util/Message.h @@ -8,6 +8,7 @@ #define CARTA_SRC_UTIL_MESSAGE_H_ #include +#include #include #include #include @@ -115,6 +116,7 @@ class Message { const CARTA::DoublePoint& center, double amp, const CARTA::DoublePoint& fwhm, double pa); static CARTA::ScriptingRequest ScriptingRequest(uint32_t scripting_request_id, const std::string& target, const std::string& action, const std::string& parameters, bool async, const std::string& return_path); + static CARTA::ChannelMapFlowControl ChannelMapFlowControl(int32_t file_id, int32_t received_channel); // Response messages static CARTA::SpectralProfileData SpectralProfileData(int32_t file_id, int32_t region_id, int32_t stokes, float progress,