Skip to content

Commit

Permalink
Support checking tile for channel map
Browse files Browse the repository at this point in the history
  • Loading branch information
pford committed Oct 23, 2024
1 parent 75373e7 commit 5562666
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 107 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ set(SOURCE_FILES
src/Region/RegionConverter.cc
src/Region/RegionHandler.cc
src/Region/RegionImportExport.cc
src/Session/ChannelMap.cc
src/Session/ChannelMapSettings.cc
src/Session/CursorSettings.cc
src/Session/OnMessageTask.cc
src/Session/Session.cc
Expand Down
22 changes: 11 additions & 11 deletions src/Session/ChannelMapSettings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@
SPDX-License-Identifier: GPL-3.0-or-later
*/

// # ChannelMap.h: Parameters related to channel map view
// # ChannelMapSettings.h: Parameters related to channel map view

#include "ChannelMap.h"
#include "ChannelMapSettings.h"

#include "Util/File.h" // ALL_FILES
#include "Util/File.h" // ALL_FILES def

namespace carta {

ChannelMap::ChannelMap(const CARTA::SetImageChannels& message) {
ChannelMapSettings::ChannelMapSettings(const CARTA::SetImageChannels& message) {
SetChannelMapParams(message);
}

bool ChannelMap::SetChannelMap(const CARTA::SetImageChannels& message) {
bool ChannelMapSettings::SetChannelMap(const CARTA::SetImageChannels& message) {
// Returns true if it is an entirely new channel view, with new channel range and tiles.
return SetChannelMapParams(message);
}

bool ChannelMap::IsInChannelRange(int file_id, int channel) {
bool ChannelMapSettings::IsInChannelRange(int file_id, int channel) {
// Returns true if input channel is in current channel range.
std::unique_lock<std::mutex> lock(_file_mutexes[file_id]);
if (_channel_ranges.find(file_id) == _channel_ranges.end()) {
Expand All @@ -30,7 +30,7 @@ bool ChannelMap::IsInChannelRange(int file_id, int channel) {
return _channel_ranges[file_id].is_in_range(channel);
}

bool ChannelMap::HasRequiredTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles) {
bool ChannelMapSettings::HasRequiredTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles) {
// Returns true if any input tiles are in current tiles, and compression is same.
std::unique_lock<std::mutex> lock(_file_mutexes[file_id]);
if (_required_tiles.find(file_id) == _required_tiles.end()) {
Expand All @@ -42,23 +42,23 @@ bool ChannelMap::HasRequiredTiles(int file_id, const CARTA::AddRequiredTiles& re
return false;
}

for (auto tile : new_tiles.encoded_tiles) {
for (auto tile : new_tiles.tiles) {
if (_required_tiles[file_id].HasTile(tile)) {
return true;
}
}
return false;
}

bool ChannelMap::HasTile(int file_id, int32_t tile) {
bool ChannelMapSettings::HasTile(int file_id, int tile) {
std::unique_lock<std::mutex> lock(_file_mutexes[file_id]);
if (_required_tiles.find(file_id) == _required_tiles.end()) {
return false;
}
return _required_tiles[file_id].HasTile(tile);
}

void ChannelMap::RemoveFile(int file_id) {
void ChannelMapSettings::RemoveFile(int file_id) {
if (file_id == ALL_FILES) {
_file_mutexes.clear();
_channel_ranges.clear();
Expand All @@ -70,7 +70,7 @@ void ChannelMap::RemoveFile(int file_id) {
}
}

bool ChannelMap::SetChannelMapParams(const CARTA::SetImageChannels& message) {
bool ChannelMapSettings::SetChannelMapParams(const CARTA::SetImageChannels& message) {
// Set new channel range and required tiles.
// Returns true if new params (for cancel).
AxisRange new_range;
Expand Down
29 changes: 17 additions & 12 deletions src/Session/ChannelMapSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

// # ChannelMap.h: Parameters related to channel map view

#ifndef CARTA_SRC_SESSION_CHANNELMAP_H_
#define CARTA_SRC_SESSION_CHANNELMAP_H_
#ifndef CARTA_SRC_SESSION_CHANNELMAPSETTINGS_H_
#define CARTA_SRC_SESSION_CHANNELMAPSETTINGS_H_

#include <vector>

Expand All @@ -18,14 +18,20 @@
namespace carta {

struct RequiredTiles {
std::vector<int32_t> encoded_tiles;
// Settings in AddRequiredTiles
std::vector<int> tiles;
CARTA::CompressionType compression_type;
float compression_quality;
std::vector<int> current_tiles;

RequiredTiles() {}
RequiredTiles(const CARTA::AddRequiredTiles& required_tiles) {
if (required_tiles.tiles_size() > 0) {
encoded_tiles = {required_tiles.tiles().begin(), required_tiles.tiles().end()};
tiles = {required_tiles.tiles().begin(), required_tiles.tiles().end()};
}

if (required_tiles.current_tiles_size() > 0) {
current_tiles = {required_tiles.current_tiles().begin(), required_tiles.current_tiles().end()};
}
compression_type = required_tiles.compression_type();
compression_quality = required_tiles.compression_quality();
Expand All @@ -35,25 +41,24 @@ struct RequiredTiles {
return compression_type == other.compression_type && compression_quality == other.compression_quality;
}

bool HasTile(int32_t tile) {
return std::find(encoded_tiles.begin(), encoded_tiles.end(), tile) != encoded_tiles.end();
bool HasTile(int tile) {
return std::find(current_tiles.begin(), current_tiles.end(), tile) != current_tiles.end();
}
};

class ChannelMap {
class ChannelMapSettings {
public:
ChannelMap(const CARTA::SetImageChannels& message);
~ChannelMap() = default;
ChannelMapSettings(const CARTA::SetImageChannels& message);
~ChannelMapSettings() = default;

bool SetChannelMap(const CARTA::SetImageChannels& message);

// Checks to support channel map cancel.
bool IsInChannelRange(int file_id, int channel);
bool HasRequiredTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles);
bool HasTile(int file_id, int32_t tile);
bool HasTile(int file_id, int tile);

// Remove a file or all files from channel maps when closed in Session.
// This cannot happen during a channel map loop due to Session frame mutex.
void RemoveFile(int file_id);

private:
Expand All @@ -67,4 +72,4 @@ class ChannelMap {

} // namespace carta

#endif // CARTA_SRC_SESSION_CHANNELMAP_H_
#endif // CARTA_SRC_SESSION_CHANNELMAPSETTINGS_H_
139 changes: 97 additions & 42 deletions src/Session/Session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Session::Session(uWS::WebSocket<false, true, PerSocketData>* ws, uWS::Loop* loop
_animation_object(nullptr),
_animation_active(false),
_stokes_files_connector(nullptr),
_channel_map(nullptr),
_channel_map_settings(nullptr),
_histogram_progress(1.0),
_ref_count(0),
_sync_id(0),
Expand Down Expand Up @@ -666,8 +666,8 @@ void Session::DeleteFrame(int file_id) {
if (_region_handler) {
_region_handler->RemoveFrame(file_id);
}
if (_channel_map) {
_channel_map->RemoveFile(file_id);
if (_channel_map_settings) {
_channel_map_settings->RemoveFile(file_id);
}
}

Expand Down Expand Up @@ -717,17 +717,21 @@ void Session::OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, int z,
for (int j = 0; j < stride; j++) {
for (int i = j; i < num_tiles; i += stride) {
const auto& encoded_coordinate = message.tiles(i);
// Check if channel and tile still valid for channel map
if (!is_current_z && !IsValidChannelMapTile(file_id, requested_z, encoded_coordinate)) {
auto tile = Tile::Decode(encoded_coordinate);

if (!is_current_z && !IsInChannelMapTiles(file_id, encoded_coordinate)) {
// Check if tile still valid for channel map before creating tile
spdlog::warn(
"Discarding stale tile request for channel={}, tile=({}, {}, {})", requested_z, tile.x, tile.y, tile.layer);
continue;
}

auto raster_tile_data = Message::RasterTileData(file_id, sync_id, animation_id);
auto tile = Tile::Decode(encoded_coordinate);

if (_frames.count(file_id) && _frames.at(file_id)->FillRasterTileData(raster_tile_data, tile, requested_z, stokes,
compression_type, compression_quality, is_current_z)) {
// Check if channel and tile still valid for channel map
if (!is_current_z && !IsValidChannelMapTile(file_id, requested_z, encoded_coordinate)) {
// Check if channel and tile still valid for channel map before sending
spdlog::warn(
"Discarding stale tile request for channel={}, tile=({}, {}, {})", requested_z, tile.x, tile.y, tile.layer);
continue;
Expand Down Expand Up @@ -1555,6 +1559,48 @@ void Session::OnSetVectorOverlayParameters(const CARTA::SetVectorOverlayParamete
}
}

void Session::OnRemoteFileRequest(const CARTA::RemoteFileRequest& message, uint32_t request_id) {
auto file_id(message.file_id());

CARTA::RemoteFileResponse response;
std::string url, err_message;
bool success = GenerateUrlFromRequest(message, url, err_message);
if (success) {
spdlog::info("Fetching remote file from url {}", url);
auto loader = _loaders.Get(url, "");

CARTA::OpenFileAck ack;
try {
loader->OpenFile("0");

std::string remote_file_name;
auto index = ++_remote_file_index;
if (index > 0) {
remote_file_name = fmt::format("remote_file{}.fits", index);
} else {
remote_file_name = "remote_file.fits";
}
spdlog::info("Opening remote file: {}", remote_file_name);

auto image = loader->GetImage();

success = OnOpenFile(file_id, remote_file_name, image, response.mutable_open_file_ack());
if (success) {
response.set_message("File opened successfully");
}
} catch (const casacore::AipsError& err) {
err_message = err.getMesg();
response.set_message(err_message);
success = false;
}
} else {
response.set_message(err_message);
}
response.set_success(success);

SendEvent(CARTA::REMOTE_FILE_RESPONSE, request_id, response);
}

// ******** SEND DATA STREAMS *********

bool Session::CalculateCubeHistogram(int file_id, CARTA::RegionHistogramData& cube_histogram_message) {
Expand Down Expand Up @@ -2054,7 +2100,6 @@ void Session::SendEvent(CARTA::EventType event_type, uint32_t event_id, const go
// Skip compression on files smaller than 1 kB
msg_vs_compress.second = compress && required_size > 1024;
_out_msgs.push(msg_vs_compress);
// spdlog::debug("***** Queued message type={} queue size={}", event_type, _out_msgs.size());

// uWS::Loop::defer(function) is the only thread-safe function.
// Use it to defer the calling of a function to the thread that runs the Loop.
Expand Down Expand Up @@ -2418,6 +2463,9 @@ void Session::CancelExistingAnimation() {
}
}

// *********************************************************************************
// SCRIPTING

void Session::SendScriptingRequest(
CARTA::ScriptingRequest& message, ScriptingResponseCallback callback, ScriptingSessionClosedCallback session_closed_callback) {
int scripting_request_id(message.scripting_request_id());
Expand Down Expand Up @@ -2454,6 +2502,9 @@ void Session::CloseAllScriptingRequests() {
_scripting_callbacks.clear();
}

// *********************************************************************************
// Session Management

void Session::StopImageFileList() {
if (_file_list_handler) {
_file_list_handler->StopGettingFileList();
Expand Down Expand Up @@ -2484,44 +2535,48 @@ void Session::CloseCachedImage(const std::string& directory, const std::string&
}
}
}
void Session::OnRemoteFileRequest(const CARTA::RemoteFileRequest& message, uint32_t request_id) {
auto file_id(message.file_id());

CARTA::RemoteFileResponse response;
std::string url, err_message;
bool success = GenerateUrlFromRequest(message, url, err_message);
if (success) {
spdlog::info("Fetching remote file from url {}", url);
auto loader = _loaders.Get(url, "");
// *********************************************************************************
// Image Channel and Channel Map

void Session::AddToSetChannelQueue(CARTA::SetImageChannels message, uint32_t request_id) {
// Image channel mutex has been locked by SessionManager.
// Set current channel or channel range, clear queue if new channel/range.
bool clear_queue(true);
if (message.has_current_range()) {
if (!_channel_map_settings) {
_channel_map_settings = std::unique_ptr<ChannelMapSettings>(new ChannelMapSettings(message));
} else {
clear_queue = _channel_map_settings->SetChannelMap(message);
}
} else {
if (_channel_map_settings) {
_channel_map_settings->SetChannelMap(message);
}
}

CARTA::OpenFileAck ack;
try {
loader->OpenFile("0");
if (clear_queue) {
std::pair<CARTA::SetImageChannels, uint32_t> rp;
while (_set_channel_queues[message.file_id()].try_pop(rp)) {
}
}

std::string remote_file_name;
auto index = ++_remote_file_index;
if (index > 0) {
remote_file_name = fmt::format("remote_file{}.fits", index);
} else {
remote_file_name = "remote_file.fits";
}
spdlog::info("Opening remote file: {}", remote_file_name);
if (message.has_required_tiles()) {
_set_channel_queues[message.file_id()].push(std::make_pair(message, request_id));
}
}

auto image = loader->GetImage();
bool Session::IsValidChannelMapTile(int file_id, int channel, int tile) {
// Check if channel is in channel range and tile is in required tiles for file id.
return IsInChannelMapRange(file_id, channel) && IsInChannelMapTiles(file_id, tile);
}

success = OnOpenFile(file_id, remote_file_name, image, response.mutable_open_file_ack());
if (success) {
response.set_message("File opened successfully");
}
} catch (const casacore::AipsError& err) {
err_message = err.getMesg();
response.set_message(err_message);
success = false;
}
} else {
response.set_message(err_message);
}
response.set_success(success);
bool Session::IsInChannelMapRange(int file_id, int channel) {
// Check if channel is in current channel map range for file id.
return _channel_map_settings && _channel_map_settings->IsInChannelRange(file_id, channel);
}

SendEvent(CARTA::REMOTE_FILE_RESPONSE, request_id, response);
bool Session::IsInChannelMapTiles(int file_id, int tile) {
// Check if tile is in current channel map tiles for file id.
return _channel_map_settings && _channel_map_settings->HasTile(file_id, tile);
}
Loading

0 comments on commit 5562666

Please sign in to comment.