Skip to content

Commit

Permalink
Redesign test for the case when compression writer sets threads priority
Browse files Browse the repository at this point in the history
Co-authored-by: Janosch Machowinski <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
2 people authored and Janosch Machowinski committed Sep 22, 2023
1 parent bd859c0 commit c8cdcba
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 65 deletions.
3 changes: 2 additions & 1 deletion rosbag2_compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ if(BUILD_TESTING)
)

ament_add_gmock(test_sequential_compression_writer
test/rosbag2_compression/test_sequential_compression_writer.cpp)
test/rosbag2_compression/test_sequential_compression_writer.cpp
test/rosbag2_compression/fake_compressor.cpp)
target_link_libraries(test_sequential_compression_writer
${PROJECT_NAME}
rosbag2_cpp::rosbag2_cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_
#define ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_

#include <memory>
#include <string>

#include "rosbag2_compression/compression_factory.hpp"
#include "fake_compressor.hpp"

class FakeCompressionFactory
: public rosbag2_compression::CompressionFactory
{
public:
FakeCompressionFactory() = delete;

~FakeCompressionFactory() override = default;

explicit FakeCompressionFactory(int & detected_thread_priority)
: detected_thread_priority_(detected_thread_priority) {}

std::shared_ptr<rosbag2_compression::BaseCompressorInterface>
create_compressor(const std::string & /*compression_format*/) override
{
return std::make_shared<FakeCompressor>(detected_thread_priority_);
}

private:
int & detected_thread_priority_;
};

#endif // ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_
18 changes: 17 additions & 1 deletion rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,27 @@
// limitations under the License.

#include <string>
#ifdef _WIN32
#include <windows.h>
#else
#include <sys/resource.h>
#endif

#include "pluginlib/class_list_macros.hpp"

#include "fake_compressor.hpp"

FakeCompressor::FakeCompressor(int & detected_thread_priority)
{
#ifndef _WIN32
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
if (cur_nice_value != -1 && errno == 0) {
detected_thread_priority = cur_nice_value;
}
#else
detected_thread_priority = GetThreadPriority(GetCurrentThread());
#endif
}

std::string FakeCompressor::compress_uri(const std::string & uri)
{
return uri + "." + get_compression_identifier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class FakeCompressor : public rosbag2_compression::BaseCompressorInterface
public:
FakeCompressor() = default;

explicit FakeCompressor(int & detected_thread_priority);

std::string compress_uri(const std::string & uri) override;

void compress_serialized_bag_message(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@
#include "mock_storage_factory.hpp"

#include "mock_compression_factory.hpp"
#include "fake_compression_factory.hpp"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <sys/resource.h>
#endif

using namespace testing; // NOLINT
Expand Down Expand Up @@ -401,82 +399,37 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}


TEST_P(SequentialCompressionWriterTest, writer_sets_nice_value)
TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";
const uint64_t kCompressionQueueSize = GetParam();
const int wanted_nice_value = 10;

class TestCompressor : public rosbag2_compression::BaseCompressorInterface
{
int & detected_nice_value;

public:
TestCompressor(int & detected_nice_value)
: detected_nice_value(detected_nice_value) {}
virtual std::string compress_uri(const std::string & uri)
{
return uri;
}

virtual void compress_serialized_bag_message(
const rosbag2_storage::SerializedBagMessage * bag_message,
rosbag2_storage::SerializedBagMessage * compressed_message)
{
#ifdef _WIN32
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
if (cur_nice_value != -1 && errno == 0) {

detected_nice_value = cur_nice_value;
}
#ifndef _WIN32
const int wanted_thread_priority = 10;
#else
//FIXME implement windows version
const int wanted_thread_priority = THREAD_MODE_BACKGROUND_BEGIN;
#endif

*compressed_message = *bag_message;
}

/**
* Get the identifier of the compression algorithm.
* This is appended to the extension of the compressed file.
*/
virtual std::string get_compression_identifier() const
{
return "niceTest";
}
};

class FakeFactory : public rosbag2_compression::CompressionFactory
{
int & detected_nice_value;

public:
FakeFactory(int & detected_nice_value)
: detected_nice_value(detected_nice_value) {}

virtual std::shared_ptr<rosbag2_compression::BaseCompressorInterface>
create_compressor(const std::string & /*compression_format*/)
{
return std::make_shared<TestCompressor>(detected_nice_value);
}
};

// queue size should be 0 or at least the number of remaining messages to prevent message loss
rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
kCompressionQueueSize,
kDefaultCompressionQueueThreads,
wanted_nice_value
wanted_thread_priority
};

#ifndef _WIN32
// nice values are in the range from -20 to +19, so this value will never be read
int detected_nice_value = 100;
int detected_thread_priority = 100;
#else
int detected_thread_priority = THREAD_PRIORITY_ERROR_RETURN;
#endif

initializeFakeFileStorage();
initializeWriter(compression_options, std::make_unique<FakeFactory>(detected_nice_value));
initializeWriter(
compression_options,
std::make_unique<FakeCompressionFactory>(detected_thread_priority));

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({test_topic_name, test_topic_type, "", "", ""});
Expand All @@ -490,8 +443,7 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_nice_value)
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(detected_nice_value, *compression_options.thread_priority);

EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority);
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

Expand Down

0 comments on commit c8cdcba

Please sign in to comment.