Skip to content

Commit

Permalink
feat: Added compression_threads_nice_value option
Browse files Browse the repository at this point in the history
Signed-off-by: Janosch Machowinski <[email protected]>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Sep 14, 2023
1 parent a710e87 commit 5227edf
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
const std::optional<int8_t> kDefaultCompressionQueueThreadsNiceValue = std::nullopt;
};

TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsNiceValue};
initializeWriter(compression_options);

EXPECT_THROW(
Expand All @@ -163,7 +165,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format)
{
rosbag2_compression::CompressionOptions compression_options{
"bad_format", rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsNiceValue};
initializeWriter(compression_options);

EXPECT_THROW(
Expand All @@ -175,7 +178,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsNiceValue};

// Set minimum file size greater than max bagfile size option
const uint64_t min_split_file_size = 10;
Expand All @@ -196,7 +200,8 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsNiceValue};
initializeWriter(compression_options);

auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty";
Expand All @@ -211,7 +216,8 @@ TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsNiceValue};
auto compression_factory = std::make_unique<StrictMock<MockCompressionFactory>>();
EXPECT_CALL(*compression_factory, create_compressor(_)).Times(1);

Expand All @@ -235,7 +241,8 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsNiceValue
};

initializeFakeFileStorage();
Expand Down Expand Up @@ -277,7 +284,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsNiceValue
};

initializeFakeFileStorage();
Expand Down Expand Up @@ -315,7 +323,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsNiceValue
};

initializeFakeFileStorage();
Expand Down Expand Up @@ -363,7 +372,8 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
kCompressionQueueSize,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsNiceValue
};

initializeFakeFileStorage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct RecordOptions
std::string compression_format = "";
uint64_t compression_queue_size = 1;
uint64_t compression_threads = 0;
int8_t compression_threads_nice_value = 0;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
bool include_hidden_topics = false;
bool include_unpublished_topics = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ std::unique_ptr<rosbag2_cpp::Writer> ReaderWriterFactory::make_writer(
record_options.compression_format,
rosbag2_compression::compression_mode_from_string(record_options.compression_mode),
record_options.compression_queue_size,
record_options.compression_threads
record_options.compression_threads,
record_options.compression_threads_nice_value,
};
if (compression_options.compression_threads < 1) {
compression_options.compression_threads = std::thread::hardware_concurrency();
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/record_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Node convert<rosbag2_transport::RecordOptions>::encode(
node["compression_format"] = record_options.compression_format;
node["compression_queue_size"] = record_options.compression_queue_size;
node["compression_threads"] = record_options.compression_threads;
node["compression_threads_nice_value"] = record_options.compression_threads_nice_value;
std::map<std::string, rosbag2_transport::Rosbag2QoS> qos_overrides(
record_options.topic_qos_profile_overrides.begin(),
record_options.topic_qos_profile_overrides.end());
Expand All @@ -80,6 +81,9 @@ bool convert<rosbag2_transport::RecordOptions>::decode(
optional_assign<std::string>(node, "compression_format", record_options.compression_format);
optional_assign<uint64_t>(node, "compression_queue_size", record_options.compression_queue_size);
optional_assign<uint64_t>(node, "compression_threads", record_options.compression_threads);
optional_assign<int8_t>(
node, "compression_threads_nice_value",
record_options.compression_threads_nice_value);

// yaml-cpp doesn't implement unordered_map
std::map<std::string, rosbag2_transport::Rosbag2QoS> qos_overrides;
Expand Down

0 comments on commit 5227edf

Please sign in to comment.