From 0c5a27b65c52fef8a667a243332e02d6f05787e6 Mon Sep 17 00:00:00 2001 From: Odysseas Georgoudis Date: Sat, 14 Sep 2024 16:14:07 +0100 Subject: [PATCH] simplify TransitEventbuffer --- CHANGELOG.md | 2 + include/quill/backend/BackendWorker.h | 12 +- include/quill/backend/TransitEventBuffer.h | 223 +++++++-------------- include/quill/core/ThreadContextManager.h | 4 +- test/unit_tests/TransitEventBufferTest.cpp | 138 +------------ 5 files changed, 81 insertions(+), 298 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61180ddb..a3536480 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,6 +82,8 @@ - Introduced support for custom buffer sizes in file streams for `FileSink` and `RotatingFileSink`. The buffer size can now be set using `write_buffer_size` in `FileSinkConfig`, with a default of 64 KB. With the new default value the backend thread has increased throughput around 5% +- Simplified the `TransitEventBuffer` in the backend worker thread, resulting in a minor throughput improvement of + approximately 1%. - Added an optional fsync interval to control the minimum time between consecutive fsync calls, reducing disk wear from frequent fsync operations. This option is only applicable when fsync is enabled. ([#557](https://github.com/odygrd/quill/issues/557)) diff --git a/include/quill/backend/BackendWorker.h b/include/quill/backend/BackendWorker.h index cc29353c..7b37265b 100644 --- a/include/quill/backend/BackendWorker.h +++ b/include/quill/backend/BackendWorker.h @@ -241,7 +241,6 @@ class BackendWorker // load all contexts locally _update_active_thread_contexts_cache(); - // Phase 1: // Read all frontend queues and cache the log statements and the metadata as TransitEvents size_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues(); @@ -350,7 +349,7 @@ class BackendWorker break; } - size_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues(); + uint64_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues(); if (cached_transit_events_count > 0) { while (!has_pending_events_for_caching_when_transit_event_buffer_empty() && @@ -406,8 +405,8 @@ class BackendWorker * @return size of the transit_event_buffer */ template - QUILL_ATTRIBUTE_HOT uint32_t _read_and_decode_frontend_queue(TFrontendQueue& frontend_queue, - ThreadContext* thread_context, uint64_t ts_now) + QUILL_ATTRIBUTE_HOT size_t _read_and_decode_frontend_queue(TFrontendQueue& frontend_queue, + ThreadContext* thread_context, uint64_t ts_now) { // Note: The producer commits only complete messages to the queue. // Therefore, if even a single byte is present in the queue, it signifies a full message. @@ -676,7 +675,7 @@ class BackendWorker { // Get the lowest timestamp uint64_t min_ts{std::numeric_limits::max()}; - UnboundedTransitEventBuffer* transit_buffer{nullptr}; + TransitEventBuffer* transit_buffer{nullptr}; for (ThreadContext* thread_context : _active_thread_contexts_cache) { @@ -693,7 +692,6 @@ class BackendWorker if (!transit_buffer) { // all buffers are empty - // return false, meaning we processed a message return false; } @@ -1160,7 +1158,7 @@ class BackendWorker { // Lazy initialise the _transit_event_buffer for this thread_context thread_context->_transit_event_buffer = - std::make_shared(_options.transit_event_buffer_initial_capacity); + std::make_shared(_options.transit_event_buffer_initial_capacity); } // We do not skip invalidated && empty queue thread contexts as this is very rare, diff --git a/include/quill/backend/TransitEventBuffer.h b/include/quill/backend/TransitEventBuffer.h index ca3582fe..f1a19d75 100644 --- a/include/quill/backend/TransitEventBuffer.h +++ b/include/quill/backend/TransitEventBuffer.h @@ -7,209 +7,128 @@ #pragma once #include "quill/backend/TransitEvent.h" +#include "quill/bundled/fmt/format.h" // for assert_fail #include "quill/core/Attributes.h" #include "quill/core/MathUtils.h" #include #include -#include #include QUILL_BEGIN_NAMESPACE namespace detail { -template -class BoundedTransitEventBufferImpl + +class TransitEventBuffer { public: - using integer_type = T; + explicit TransitEventBuffer(size_t initial_capacity) + : _capacity(next_power_of_two(initial_capacity)), + _storage(std::make_unique(_capacity)), + _mask(_capacity - 1u) + { + } + + TransitEventBuffer(TransitEventBuffer const&) = delete; + TransitEventBuffer& operator=(TransitEventBuffer const&) = delete; - /***/ - explicit BoundedTransitEventBufferImpl(integer_type capacity) - : _capacity(next_power_of_two(capacity)), _mask(static_cast(_capacity - 1u)) + // Move constructor + TransitEventBuffer(TransitEventBuffer&& other) noexcept + : _capacity(other._capacity), + _storage(std::move(other._storage)), + _mask(other._mask), + _reader_pos(other._reader_pos), + _writer_pos(other._writer_pos) { - _storage.resize(_capacity); + other._capacity = 0; + other._mask = 0; + other._reader_pos = 0; + other._writer_pos = 0; } - /***/ - BoundedTransitEventBufferImpl(BoundedTransitEventBufferImpl const&) = delete; - BoundedTransitEventBufferImpl& operator=(BoundedTransitEventBufferImpl const&) = delete; + // Move assignment operator + TransitEventBuffer& operator=(TransitEventBuffer&& other) noexcept + { + if (this != &other) + { + _capacity = other._capacity; + _storage = std::move(other._storage); + _mask = other._mask; + _reader_pos = other._reader_pos; + _writer_pos = other._writer_pos; + + other._capacity = 0; + other._mask = 0; + other._reader_pos = 0; + other._writer_pos = 0; + } + return *this; + } - /***/ QUILL_NODISCARD QUILL_ATTRIBUTE_HOT TransitEvent* front() noexcept { - if (_writer_pos == _reader_pos) + if (_reader_pos == _writer_pos) { - // empty return nullptr; } - - return &_storage[static_cast(_reader_pos & _mask)]; + return &_storage[_reader_pos & _mask]; } - /***/ QUILL_ATTRIBUTE_HOT void pop_front() noexcept { ++_reader_pos; } - /***/ QUILL_NODISCARD QUILL_ATTRIBUTE_HOT TransitEvent* back() noexcept { - if (_capacity - size() == 0) + if (_capacity == size()) { - // is full - return nullptr; + // Buffer is full, need to expand + _expand(); } - - return &_storage[static_cast(_writer_pos & _mask)]; + return &_storage[_writer_pos & _mask]; } - /***/ QUILL_ATTRIBUTE_HOT void push_back() noexcept { ++_writer_pos; } - /***/ - QUILL_NODISCARD QUILL_ATTRIBUTE_HOT integer_type size() const noexcept - { - return static_cast(_writer_pos - _reader_pos); - } - - /***/ - QUILL_NODISCARD integer_type capacity() const noexcept - { - return static_cast(_capacity); - } - -private: - integer_type const _capacity; - integer_type const _mask; - integer_type _reader_pos{0}; - integer_type _writer_pos{0}; - std::vector _storage; -}; - -using BoundedTransitEventBuffer = BoundedTransitEventBufferImpl; - -class UnboundedTransitEventBuffer -{ -public: - /***/ - struct Node - { - /** - * Constructor - * @param transit_buffer_capacity the capacity of the fixed buffer - */ - explicit Node(uint32_t transit_buffer_capacity) : transit_buffer(transit_buffer_capacity) {} - - /** members */ - Node* next{nullptr}; - BoundedTransitEventBuffer transit_buffer; - }; - - /***/ - explicit UnboundedTransitEventBuffer(uint32_t initial_transit_buffer_capacity) - : _writer(new Node(initial_transit_buffer_capacity)), _reader(_writer) - { - } - - /***/ - UnboundedTransitEventBuffer(UnboundedTransitEventBuffer const&) = delete; - UnboundedTransitEventBuffer& operator=(UnboundedTransitEventBuffer const&) = delete; - - /***/ - ~UnboundedTransitEventBuffer() + QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t size() const noexcept { - Node const* reader_node = _reader; - - // Look for extra nodes to delete - while (reader_node) - { - auto const to_delete = reader_node; - reader_node = reader_node->next; - delete to_delete; - } + return _writer_pos - _reader_pos; } - /***/ - QUILL_NODISCARD QUILL_ATTRIBUTE_HOT TransitEvent* front() noexcept - { - TransitEvent* next_event = _reader->transit_buffer.front(); - - if (!next_event) - { - // the buffer is empty check if another buffer exists - if (QUILL_UNLIKELY(_reader->next != nullptr)) - { - // a new buffer was added by the producer, this happens only when we have allocated a new queue - - // switch to the new buffer, existing one is deleted - Node* next_node = _reader->next; - delete _reader; - _reader = next_node; - next_event = _reader->transit_buffer.front(); - } - } - - return next_event; - } + QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t capacity() const noexcept { return _capacity; } - /***/ - QUILL_ATTRIBUTE_HOT void pop_front() noexcept { _reader->transit_buffer.pop_front(); } - - /***/ - QUILL_NODISCARD QUILL_ATTRIBUTE_HOT TransitEvent* back() noexcept + QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept { - // Try to reserve the bounded queue - TransitEvent* write_event = _writer->transit_buffer.back(); - - if (QUILL_UNLIKELY(write_event == nullptr)) - { - // buffer doesn't have enough space - uint64_t capacity = static_cast(_writer->transit_buffer.capacity()) * 2ull; - uint64_t constexpr max_bounded_queue_capacity = - (std::numeric_limits::max() >> 1) + 1; - - if (QUILL_UNLIKELY(capacity > max_bounded_queue_capacity)) - { - capacity = max_bounded_queue_capacity; - } - - auto const new_node = new Node{static_cast(capacity)}; - _writer->next = new_node; - _writer = _writer->next; - write_event = _writer->transit_buffer.back(); - } - - assert(write_event && "Write event is always true"); - - return write_event; + return _reader_pos == _writer_pos; } - /***/ - QUILL_ATTRIBUTE_HOT void push_back() noexcept { _writer->transit_buffer.push_back(); } - - /***/ - QUILL_NODISCARD QUILL_ATTRIBUTE_HOT uint32_t size() const noexcept +private: + void _expand() { - Node const* reader = _reader; + size_t const new_capacity = _capacity * 2; - uint32_t size = reader->transit_buffer.size(); + auto new_storage = std::make_unique(new_capacity); - while (reader->next) + // Copy existing elements to the new storage + size_t const current_size = size(); + for (size_t i = 0; i < current_size; ++i) { - reader = reader->next; - size += reader->transit_buffer.size(); + new_storage[i] = std::move(_storage[(_reader_pos + i) & _mask]); } - return size; + _storage = std::move(new_storage); + _capacity = new_capacity; + _mask = _capacity - 1; + _writer_pos = current_size; + _reader_pos = 0; } - /***/ - QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() noexcept { return front() ? false : true; } - -private: - Node* _writer{nullptr}; - Node* _reader{nullptr}; + size_t _capacity; + std::unique_ptr _storage; + size_t _mask; + size_t _reader_pos{0}; + size_t _writer_pos{0}; }; + } // namespace detail QUILL_END_NAMESPACE \ No newline at end of file diff --git a/include/quill/core/ThreadContextManager.h b/include/quill/core/ThreadContextManager.h index 01519389..1968d946 100644 --- a/include/quill/core/ThreadContextManager.h +++ b/include/quill/core/ThreadContextManager.h @@ -30,7 +30,7 @@ QUILL_BEGIN_NAMESPACE namespace detail { /** Forward Declarations **/ -class UnboundedTransitEventBuffer; +class TransitEventBuffer; class BackendWorker; class ThreadContext @@ -202,7 +202,7 @@ class ThreadContext SizeCacheVector _conditional_arg_size_cache; /**< cache for storing sizes needed for specific operations, such as when calling `strn` functions or when a loop is required e.g. caching the size of a type */ std::string _thread_id = std::to_string(get_thread_id()); /**< cached thread pid */ std::string _thread_name = get_thread_name(); /**< cached thread name */ - std::shared_ptr _transit_event_buffer; /**< backend thread buffer. this could be unique_ptr but it is shared_ptr because of the forward declaration */ + std::shared_ptr _transit_event_buffer; /**< backend thread buffer. this could be unique_ptr but it is shared_ptr because of the forward declaration */ QueueType _queue_type; std::atomic _valid{true}; /**< is this context valid, set by the frontend, read by the backend thread */ alignas(CACHE_LINE_ALIGNED) std::atomic _failure_counter{0}; diff --git a/test/unit_tests/TransitEventBufferTest.cpp b/test/unit_tests/TransitEventBufferTest.cpp index 6e86b076..e34e2610 100644 --- a/test/unit_tests/TransitEventBufferTest.cpp +++ b/test/unit_tests/TransitEventBufferTest.cpp @@ -8,146 +8,10 @@ TEST_SUITE_BEGIN("TransitEventBuffer"); using namespace quill; using namespace quill::detail; -/***/ -TEST_CASE("transit_event_bounded_buffer") -{ - BoundedTransitEventBuffer bte{4}; - REQUIRE_EQ(bte.capacity(), 4); - - for (size_t i = 0; i < 12; ++i) - { - REQUIRE_FALSE(bte.front()); - REQUIRE_EQ(bte.size(), 0); - - { - TransitEvent* te1 = bte.back(); - REQUIRE(te1); - - te1->named_args = std::make_unique>>(); - te1->named_args->clear(); - te1->named_args->emplace_back(std::string{"test1"} + std::to_string(i), std::string{}); - bte.push_back(); - } - - REQUIRE_EQ(bte.size(), 1); - - { - TransitEvent* te2 = bte.back(); - REQUIRE(te2); - - te2->named_args = std::make_unique>>(); - te2->named_args->clear(); - te2->named_args->emplace_back(std::string{"test2"} + std::to_string(i), std::string{}); - bte.push_back(); - } - - REQUIRE_EQ(bte.size(), 2); - - { - TransitEvent* te3 = bte.back(); - REQUIRE(te3); - - te3->named_args = std::make_unique>>(); - te3->named_args->clear(); - te3->named_args->emplace_back(std::string{"test3"} + std::to_string(i), std::string{}); - bte.push_back(); - } - - REQUIRE_EQ(bte.size(), 3); - - { - TransitEvent* te4 = bte.back(); - REQUIRE(te4); - - te4->named_args = std::make_unique>>(); - te4->named_args->clear(); - te4->named_args->emplace_back(std::string{"test4"} + std::to_string(i), std::string{}); - bte.push_back(); - } - - REQUIRE_EQ(bte.size(), 4); - - // read - { - TransitEvent* te1 = bte.front(); - REQUIRE(te1); - std::string const expected = std::string{"test1"} + std::to_string(i); - REQUIRE_STREQ((*te1->named_args)[0].first.data(), expected.data()); - bte.pop_front(); - } - - REQUIRE_EQ(bte.size(), 3); - - { - TransitEvent* te2 = bte.front(); - REQUIRE(te2); - std::string const expected = std::string{"test2"} + std::to_string(i); - REQUIRE_STREQ((*te2->named_args)[0].first.data(), expected.data()); - bte.pop_front(); - } - - REQUIRE_EQ(bte.size(), 2); - - { - TransitEvent* te3 = bte.front(); - REQUIRE(te3); - std::string const expected = std::string{"test3"} + std::to_string(i); - REQUIRE_STREQ((*te3->named_args)[0].first.data(), expected.data()); - bte.pop_front(); - } - - REQUIRE_EQ(bte.size(), 1); - - { - TransitEvent* te4 = bte.front(); - REQUIRE(te4); - std::string const expected = std::string{"test4"} + std::to_string(i); - REQUIRE_STREQ((*te4->named_args)[0].first.data(), expected.data()); - bte.pop_front(); - } - - REQUIRE_EQ(bte.size(), 0); - REQUIRE_FALSE(bte.front()); - } -} - -/***/ -TEST_CASE("transit_event_bounded_buffer_integer_overflow") -{ - BoundedTransitEventBufferImpl bte{128}; - size_t constexpr iterations = static_cast(std::numeric_limits::max()) * 8ull; - - for (size_t i = 0; i < iterations; ++i) - { - REQUIRE_EQ(bte.size(), 0); - - { - TransitEvent* te = bte.back(); - REQUIRE(te); - - te->named_args = std::make_unique>>(); - te->named_args->clear(); - te->named_args->emplace_back(std::string{"test"} + std::to_string(i), std::string{}); - bte.push_back(); - } - - REQUIRE_EQ(bte.size(), 1); - - // read - { - TransitEvent* te = bte.front(); - REQUIRE(te); - std::string const expected = std::string{"test"} + std::to_string(i); - REQUIRE_STREQ((*te->named_args)[0].first.data(), expected.data()); - bte.pop_front(); - } - } -} - /***/ TEST_CASE("transit_event_unbounded_buffer") { - UnboundedTransitEventBuffer bte{4}; + TransitEventBuffer bte{4}; REQUIRE_FALSE(bte.front()); REQUIRE(bte.empty());