Skip to content

Commit

Permalink
Determine valid channel map tiles for accurate RasterTileSync tile count
Browse files Browse the repository at this point in the history
  • Loading branch information
pford committed Dec 5, 2024
1 parent 5fca92d commit 4109231
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 55 deletions.
16 changes: 8 additions & 8 deletions src/Session/ChannelMapSettings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ bool ChannelMapSettings::HasRequiredTiles(int file_id, const CARTA::AddRequiredT
return false;
}

bool ChannelMapSettings::HasTile(int file_id, int tile) {
bool ChannelMapSettings::GetValidTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles, std::vector<int>& valid_tiles) {
// Return valid tiles which are in current tiles for file id.
std::unique_lock<std::mutex> lock(_file_mutexes[file_id]);
if (_required_tiles.find(file_id) == _required_tiles.end()) {
return false;
}
return _required_tiles[file_id].HasTile(tile);
}

bool ChannelMapSettings::HasTiles(int file_id, const std::vector<int>& tiles) {
std::unique_lock<std::mutex> lock(_file_mutexes[file_id]);
if (_required_tiles.find(file_id) == _required_tiles.end()) {
return false;
for (int i = 0; i < required_tiles.tiles_size(); ++i) {
int tile = required_tiles.tiles(i);
if (_required_tiles[file_id].HasTile(tile)) {
valid_tiles.push_back(tile);
}
}
return _required_tiles[file_id].HasTiles(tiles);
return !valid_tiles.empty();
}

void ChannelMapSettings::RemoveFile(int file_id) {
Expand Down
3 changes: 1 addition & 2 deletions src/Session/ChannelMapSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class ChannelMapSettings {
// Checks to support channel map cancel.
bool IsInChannelRange(int file_id, int channel);
bool HasRequiredTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles);
bool HasTile(int file_id, int tile);
bool HasTiles(int file_id, const std::vector<int>& tiles);
bool GetValidTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles, std::vector<int>& valid_tiles);

// Remove a file or all files from channel maps when closed in Session.
void RemoveFile(int file_id);
Expand Down
71 changes: 31 additions & 40 deletions src/Session/Session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -721,25 +721,10 @@ void Session::OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, int z,
for (int i = j; i < num_tiles; i += stride) {
const auto& encoded_coordinate = message.tiles(i);
auto tile = Tile::Decode(encoded_coordinate);

if (!is_current_z && !IsInChannelMapTiles(file_id, encoded_coordinate)) {
// Check if tile still valid for channel map before creating tile
spdlog::warn(
"Discarding stale tile request for channel={}, tile=({}, {}, {})", requested_z, tile.x, tile.y, tile.layer);
continue;
}

auto raster_tile_data = Message::RasterTileData(file_id, sync_id, animation_id);

if (_frames.count(file_id) && _frames.at(file_id)->FillRasterTileData(raster_tile_data, tile, requested_z, stokes,
compression_type, compression_quality, is_current_z)) {
if (!is_current_z && !IsValidChannelMapTile(file_id, requested_z, encoded_coordinate)) {
// Check if channel and tile still valid for channel map before sending
spdlog::warn(
"Discarding stale tile request for channel={}, tile=({}, {}, {})", requested_z, tile.x, tile.y, tile.layer);
continue;
}

// Only use deflate on outgoing message if the raster image compression type is NONE
SendFileEvent(
file_id, CARTA::EventType::RASTER_TILE_DATA, 0, raster_tile_data, compression_type == CARTA::CompressionType::NONE);
Expand Down Expand Up @@ -769,7 +754,7 @@ void Session::OnSetImageChannels(const CARTA::SetImageChannels& message) {
int start_channel(message.channel_range().min());
int end_channel(message.channel_range().max());
int num_channel(frame->Depth());
std::vector<int> required_tiles = {message.required_tiles().tiles().begin(), message.required_tiles().tiles().end()};
auto required_tiles = message.required_tiles();
bool skipped_channel(false);

// Use animation limits for flow control
Expand All @@ -786,7 +771,7 @@ void Session::OnSetImageChannels(const CARTA::SetImageChannels& message) {
continue;
}

if (!ChannelMapTilesValid(file_id, required_tiles)) {
if (!HasValidChannelMapTiles(file_id, required_tiles)) {
spdlog::debug(
"Cancel starting at channel {} in range {}-{}, tiles not in current tiles", chan, start_channel, end_channel);
break;
Expand All @@ -808,23 +793,32 @@ void Session::OnSetImageChannels(const CARTA::SetImageChannels& message) {
spdlog::debug("Skip channel {} in range {}-{}, not in current range", chan, start_channel, end_channel);
continue;
}
if (!ChannelMapTilesValid(file_id, required_tiles)) {
if (!HasValidChannelMapTiles(file_id, required_tiles)) {
spdlog::debug(
"Cancel starting at channel {} in range {}-{}, tiles not in current tiles", chan, start_channel, end_channel);
"Cancel starting at channel {} in range {}-{}, no tiles in current tiles", chan, start_channel, end_channel);
break;
}
}

auto start_time = std::chrono::high_resolution_clock::now();
spdlog::debug("Send channel {} in range {}-{}", chan, start_channel, end_channel);
OnAddRequiredTiles(message.required_tiles(), chan);
skipped_channel = false;

if (chan < end_channel) {
// Wait until interval elapsed to execute next channel.
auto wait_us = std::chrono::duration_cast<std::chrono::microseconds>(
start_time + channel_interval - std::chrono::high_resolution_clock::now());
std::this_thread::sleep_for(wait_us);
std::vector<int> valid_tiles;
if (GetValidChannelMapTiles(file_id, required_tiles, valid_tiles)) {
auto interval_start_time = std::chrono::high_resolution_clock::now();
auto current_required_tiles = Message::AddRequiredTiles(
file_id, required_tiles.compression_type(), required_tiles.compression_quality(), valid_tiles);
spdlog::debug("Send channel {} in range {}-{}", chan, start_channel, end_channel);
OnAddRequiredTiles(current_required_tiles, chan);
skipped_channel = false;

if (chan < end_channel) {
// Wait until interval elapsed to execute next channel.
auto wait_us = std::chrono::duration_cast<std::chrono::microseconds>(
interval_start_time + channel_interval - std::chrono::high_resolution_clock::now());
std::this_thread::sleep_for(wait_us);
}
} else {
spdlog::debug(
"Cancel starting at channel {} in range {}-{}, no tiles in current tiles", chan, start_channel, end_channel);
break;
}
}
} else {
Expand Down Expand Up @@ -2610,25 +2604,22 @@ void Session::AddToSetChannelQueue(CARTA::SetImageChannels message, uint32_t req
}
}

bool Session::IsValidChannelMapTile(int file_id, int channel, int tile) {
// Check if channel is in channel range and tile is in required tiles for file id.
return IsInChannelMapRange(file_id, channel) && IsInChannelMapTiles(file_id, tile);
}

bool Session::IsInChannelMapRange(int file_id, int channel) {
// Check if channel is in current channel map range for file id.
return _channel_map_settings && _channel_map_settings->IsInChannelRange(file_id, channel);
}

bool Session::IsInChannelMapTiles(int file_id, int tile) {
// Check if tile is in current channel map tiles for file id.
return _channel_map_settings && _channel_map_settings->HasTile(file_id, tile);
bool Session::HasValidChannelMapTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles) {
// Check if any tiles are in current channel map tiles for file id.
return _channel_map_settings && _channel_map_settings->HasRequiredTiles(file_id, required_tiles);
}

bool Session::ChannelMapTilesValid(int file_id, const std::vector<int>& tiles) {
// Check if any tiles are in current channel map tiles for file id.
return _channel_map_settings && _channel_map_settings->HasTiles(file_id, tiles);
bool Session::GetValidChannelMapTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles, std::vector<int>& valid_tiles) {
// Return requested tiles which exist in current channel map tiles for file id.
// Returns whether any tiles are valid.
return _channel_map_settings && _channel_map_settings->GetValidTiles(file_id, required_tiles, valid_tiles);
}

void Session::HandleChannelMapFlowControlEvt(CARTA::ChannelMapFlowControl& message) {
_channel_map_received_channel[message.file_id()] = message.received_channel();
}
5 changes: 2 additions & 3 deletions src/Session/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,9 @@ class Session {
void SendLogEvent(const std::string& message, std::vector<std::string> tags, CARTA::ErrorSeverity severity);

// Channel map cancellation
bool IsValidChannelMapTile(int file_id, int channel, int tile);
bool IsInChannelMapRange(int file_id, int channel);
bool IsInChannelMapTiles(int file_id, int tile);
bool ChannelMapTilesValid(int file_id, const std::vector<int>& tiles);
bool HasValidChannelMapTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles);
bool GetValidChannelMapTiles(int file_id, const CARTA::AddRequiredTiles& required_tiles, std::vector<int>& valid_tiles);

// uWebSockets
uWS::WebSocket<false, true, PerSocketData>* _socket;
Expand Down
2 changes: 1 addition & 1 deletion src/Util/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ CARTA::SetHistogramRequirements Message::SetHistogramRequirements(int32_t file_i
}

CARTA::AddRequiredTiles Message::AddRequiredTiles(
int32_t file_id, CARTA::CompressionType compression_type, float compression_quality, const std::vector<float>& tiles) {
int32_t file_id, CARTA::CompressionType compression_type, float compression_quality, const std::vector<int32_t>& tiles) {
CARTA::AddRequiredTiles add_required_tiles;
add_required_tiles.set_file_id(file_id);
add_required_tiles.set_compression_type(compression_type);
Expand Down
2 changes: 1 addition & 1 deletion src/Util/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class Message {
static CARTA::SetHistogramRequirements SetHistogramRequirements(
int32_t file_id, int32_t region_id, int32_t channel = CURRENT_Z, int32_t num_bins = AUTO_BIN_SIZE);
static CARTA::AddRequiredTiles AddRequiredTiles(
int32_t file_id, CARTA::CompressionType compression_type, float compression_quality, const std::vector<float>& tiles);
int32_t file_id, CARTA::CompressionType compression_type, float compression_quality, const std::vector<int32_t>& tiles);
static CARTA::Point Point(float x, float y);
static CARTA::Point Point(const casacore::Vector<casacore::Double>& input, int x_index = 0, int y_index = 1);
static CARTA::Point Point(const std::vector<casacore::Quantity>& input, int x_index = 0, int y_index = 1);
Expand Down

0 comments on commit 4109231

Please sign in to comment.