From 5227edf9d81bb59da5330c16bd558850c5113aad Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Tue, 12 Sep 2023 09:21:42 +0000 Subject: [PATCH] feat: Added compression_threads_nice_value option Signed-off-by: Janosch Machowinski --- .../test_sequential_compression_writer.cpp | 28 +++++++++++++------ .../rosbag2_transport/record_options.hpp | 1 + .../reader_writer_factory.cpp | 3 +- .../src/rosbag2_transport/record_options.cpp | 4 +++ 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 169afc3056..30c0cef5eb 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -143,13 +143,15 @@ class SequentialCompressionWriterTest : public TestWithParam const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; + const std::optional kDefaultCompressionQueueThreadsNiceValue = std::nullopt; }; TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri) { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsNiceValue}; initializeWriter(compression_options); EXPECT_THROW( @@ -163,7 +165,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format) { rosbag2_compression::CompressionOptions compression_options{ "bad_format", rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsNiceValue}; initializeWriter(compression_options); EXPECT_THROW( @@ -175,7 +178,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size) { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsNiceValue}; // Set minimum file size greater than max bagfile size option const uint64_t min_split_file_size = 10; @@ -196,7 +200,8 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsNiceValue}; initializeWriter(compression_options); auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty"; @@ -211,7 +216,8 @@ TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor) { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsNiceValue}; auto compression_factory = std::make_unique>(); EXPECT_CALL(*compression_factory, create_compressor(_)).Times(1); @@ -235,7 +241,8 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, kDefaultCompressionQueueSize, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsNiceValue }; initializeFakeFileStorage(); @@ -277,7 +284,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_ DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, 0, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsNiceValue }; initializeFakeFileStorage(); @@ -315,7 +323,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, 0, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsNiceValue }; initializeFakeFileStorage(); @@ -363,7 +372,8 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, kCompressionQueueSize, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsNiceValue }; initializeFakeFileStorage(); diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index a387ab0998..07f9e9e5e0 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -42,6 +42,7 @@ struct RecordOptions std::string compression_format = ""; uint64_t compression_queue_size = 1; uint64_t compression_threads = 0; + int8_t compression_threads_nice_value = 0; std::unordered_map topic_qos_profile_overrides{}; bool include_hidden_topics = false; bool include_unpublished_topics = false; diff --git a/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp b/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp index b1abeae33c..8123565fb9 100644 --- a/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp +++ b/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp @@ -53,7 +53,8 @@ std::unique_ptr ReaderWriterFactory::make_writer( record_options.compression_format, rosbag2_compression::compression_mode_from_string(record_options.compression_mode), record_options.compression_queue_size, - record_options.compression_threads + record_options.compression_threads, + record_options.compression_threads_nice_value, }; if (compression_options.compression_threads < 1) { compression_options.compression_threads = std::thread::hardware_concurrency(); diff --git a/rosbag2_transport/src/rosbag2_transport/record_options.cpp b/rosbag2_transport/src/rosbag2_transport/record_options.cpp index 9f94ac68a3..b863da080b 100644 --- a/rosbag2_transport/src/rosbag2_transport/record_options.cpp +++ b/rosbag2_transport/src/rosbag2_transport/record_options.cpp @@ -54,6 +54,7 @@ Node convert::encode( node["compression_format"] = record_options.compression_format; node["compression_queue_size"] = record_options.compression_queue_size; node["compression_threads"] = record_options.compression_threads; + node["compression_threads_nice_value"] = record_options.compression_threads_nice_value; std::map qos_overrides( record_options.topic_qos_profile_overrides.begin(), record_options.topic_qos_profile_overrides.end()); @@ -80,6 +81,9 @@ bool convert::decode( optional_assign(node, "compression_format", record_options.compression_format); optional_assign(node, "compression_queue_size", record_options.compression_queue_size); optional_assign(node, "compression_threads", record_options.compression_threads); + optional_assign( + node, "compression_threads_nice_value", + record_options.compression_threads_nice_value); // yaml-cpp doesn't implement unordered_map std::map qos_overrides;