diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index d1e8a5fdb..9abc159c4 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -194,6 +194,17 @@ def add_recorder_arguments(parser: ArgumentParser) -> None: '--compression-threads', type=int, default=0, help='Number of files or messages that may be compressed in parallel. ' 'Default is %(default)d, which will be interpreted as the number of CPU cores.') + parser.add_argument( + '--compression-threads-priority', type=int, default=0, + help='Compression threads scheduling priority. \nFor Windows the valid values are:' + ' THREAD_PRIORITY_LOWEST=-2, THREAD_PRIORITY_BELOW_NORMAL=-1 and' + ' THREAD_PRIORITY_NORMAL=0. Please refer to' + ' https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority' # noqa E501 + ' for details.\n' + 'For POSIX compatible OSes this is the "nice" value. The nice value range is' + ' -20 to +19 where -20 is highest, 0 default and +19 is lowest.' + ' Please refer to https://man7.org/linux/man-pages/man2/nice.2.html for details.\n' + 'Default is %(default)d.') parser.add_argument( '--compression-mode', type=str, default='none', choices=['none', 'file', 'message'], @@ -336,6 +347,7 @@ def main(self, *, args): # noqa: D102 record_options.compression_format = args.compression_format record_options.compression_queue_size = args.compression_queue_size record_options.compression_threads = args.compression_threads + record_options.compression_threads_priority = args.compression_threads_priority record_options.topic_qos_profile_overrides = qos_profile_overrides record_options.include_hidden_topics = args.include_hidden_topics record_options.include_unpublished_topics = args.include_unpublished_topics diff --git a/ros2bag/test/test_record_with_compression_thread_priority.py b/ros2bag/test/test_record_with_compression_thread_priority.py new file mode 100644 index 000000000..a6e58086c --- /dev/null +++ b/ros2bag/test/test_record_with_compression_thread_priority.py @@ -0,0 +1,55 @@ +# Copyright 2024 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +import tempfile + +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess + +import launch_testing +import launch_testing.actions + +import pytest + + +@pytest.mark.launch_test +def generate_test_description(): + tmp_dir_name = tempfile.mkdtemp() + output_path = Path(tmp_dir_name) / 'ros2bag_test_record_with_compression_thread_priority' + record_process = ExecuteProcess( + cmd=['ros2', 'bag', 'record', '-a', '--output', output_path.as_posix(), + '--log-level', 'debug', '--compression-threads-priority', '1', + '--compression-mode', 'file', '--compression-format', 'zstd'], + name='ros2bag-cli', + output='screen', + ) + + return LaunchDescription([ + record_process, + launch_testing.actions.ReadyToTest() + ]), locals() + + +class TestRecordWithCompressionThreadPriority(unittest.TestCase): + + def test_priority_propagated_into_compression_thread( + self, record_process, proc_output): + proc_output.assertWaitFor( + 'Setting compression thread priority to 1', + timeout=45, + process=record_process + ) diff --git a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp index 3b8445de0..b40ca4aef 100644 --- a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp +++ b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp @@ -64,8 +64,9 @@ struct CompressionOptions /// \brief // The number of compression threads uint64_t compression_threads{0}; /// \brief If set, the compression thread(s) will try to set the given priority for itself - /// For Windows the valid values are: THREAD_PRIORITY_LOWEST, THREAD_PRIORITY_BELOW_NORMAL and - /// THREAD_PRIORITY_NORMAL. For POSIX compatible OSes this is the "nice" value. + /// For Windows the valid values are: THREAD_PRIORITY_LOWEST=-2, THREAD_PRIORITY_BELOW_NORMAL=-1 + /// and THREAD_PRIORITY_NORMAL=0. For POSIX compatible OSes this is the "nice" value. + /// The nice value range is -20 to +19 where -20 is highest, 0 default and +19 is lowest. std::optional thread_priority; }; diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 8f5a89c61..d4674e2eb 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -71,6 +71,9 @@ SequentialCompressionWriter::~SequentialCompressionWriter() void SequentialCompressionWriter::compression_thread_fn() { if (compression_options_.thread_priority) { + ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM( + "Setting compression thread priority to " + << *compression_options_.thread_priority); #ifdef _WIN32 // This must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST... int wanted_thread_priority = *compression_options_.thread_priority; diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 25f913756..2b5071952 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -154,7 +154,7 @@ class SequentialCompressionWriterTest : public TestWithParam const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; - const std::optional kDefaultCompressionQueueThreadsPriority = std::nullopt; + const int32_t kDefaultCompressionQueueThreadsPriority = 0; }; TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri) diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index 76c6531a6..fdd8f0780 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -48,6 +48,7 @@ class RecordOptions: compression_mode: str compression_queue_size: int compression_threads: int + compression_threads_priority: int disable_keyboard_controls: bool exclude_regex: str exclude_service_events: List[str] diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index a92dd3684..9d15e82b1 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -573,6 +573,7 @@ PYBIND11_MODULE(_transport, m) { .def_readwrite("compression_format", &RecordOptions::compression_format) .def_readwrite("compression_queue_size", &RecordOptions::compression_queue_size) .def_readwrite("compression_threads", &RecordOptions::compression_threads) + .def_readwrite("compression_threads_priority", &RecordOptions::compression_threads_priority) .def_property( "topic_qos_profile_overrides", &RecordOptions::getTopicQoSProfileOverrides, diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index 695dc531b..5d6d22bb3 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -47,7 +47,12 @@ struct RecordOptions std::string compression_mode = ""; std::string compression_format = ""; uint64_t compression_queue_size = 1; + /// \brief // The number of compression threads uint64_t compression_threads = 0; + /// \brief Compression threads scheduling priority. + /// For Windows the valid values are: THREAD_PRIORITY_LOWEST=-2, THREAD_PRIORITY_BELOW_NORMAL=-1 + /// and THREAD_PRIORITY_NORMAL=0. For POSIX compatible OSes this is the "nice" value. + /// The nice value range is -20 to +19 where -20 is highest, 0 default and +19 is lowest. int32_t compression_threads_priority = 0; std::unordered_map topic_qos_profile_overrides{}; bool include_hidden_topics = false;