Skip to content

Commit

Permalink
simplify TransitEventbuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
odygrd authored Sep 14, 2024
1 parent 38b102a commit 0c5a27b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 298 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
- Introduced support for custom buffer sizes in file streams for `FileSink` and `RotatingFileSink`. The buffer size can
now be set using `write_buffer_size` in `FileSinkConfig`, with a default of 64 KB. With the new default value the
backend thread has increased throughput around 5%
- Simplified the `TransitEventBuffer` in the backend worker thread, resulting in a minor throughput improvement of
approximately 1%.
- Added an optional fsync interval to control the minimum time between consecutive fsync calls, reducing disk wear from
frequent fsync operations. This option is only applicable when fsync is
enabled. ([#557](https://github.com/odygrd/quill/issues/557))
Expand Down
12 changes: 5 additions & 7 deletions include/quill/backend/BackendWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ class BackendWorker
// load all contexts locally
_update_active_thread_contexts_cache();

// Phase 1:
// Read all frontend queues and cache the log statements and the metadata as TransitEvents
size_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues();

Expand Down Expand Up @@ -350,7 +349,7 @@ class BackendWorker
break;
}

size_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues();
uint64_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues();
if (cached_transit_events_count > 0)
{
while (!has_pending_events_for_caching_when_transit_event_buffer_empty() &&
Expand Down Expand Up @@ -406,8 +405,8 @@ class BackendWorker
* @return size of the transit_event_buffer
*/
template <typename TFrontendQueue>
QUILL_ATTRIBUTE_HOT uint32_t _read_and_decode_frontend_queue(TFrontendQueue& frontend_queue,
ThreadContext* thread_context, uint64_t ts_now)
QUILL_ATTRIBUTE_HOT size_t _read_and_decode_frontend_queue(TFrontendQueue& frontend_queue,
ThreadContext* thread_context, uint64_t ts_now)
{
// Note: The producer commits only complete messages to the queue.
// Therefore, if even a single byte is present in the queue, it signifies a full message.
Expand Down Expand Up @@ -676,7 +675,7 @@ class BackendWorker
{
// Get the lowest timestamp
uint64_t min_ts{std::numeric_limits<uint64_t>::max()};
UnboundedTransitEventBuffer* transit_buffer{nullptr};
TransitEventBuffer* transit_buffer{nullptr};

for (ThreadContext* thread_context : _active_thread_contexts_cache)
{
Expand All @@ -693,7 +692,6 @@ class BackendWorker
if (!transit_buffer)
{
// all buffers are empty
// return false, meaning we processed a message
return false;
}

Expand Down Expand Up @@ -1160,7 +1158,7 @@ class BackendWorker
{
// Lazy initialise the _transit_event_buffer for this thread_context
thread_context->_transit_event_buffer =
std::make_shared<UnboundedTransitEventBuffer>(_options.transit_event_buffer_initial_capacity);
std::make_shared<TransitEventBuffer>(_options.transit_event_buffer_initial_capacity);
}

// We do not skip invalidated && empty queue thread contexts as this is very rare,
Expand Down
223 changes: 71 additions & 152 deletions include/quill/backend/TransitEventBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,209 +7,128 @@
#pragma once

#include "quill/backend/TransitEvent.h"
#include "quill/bundled/fmt/format.h" // for assert_fail
#include "quill/core/Attributes.h"
#include "quill/core/MathUtils.h"

#include <cassert>
#include <cstdint>
#include <limits>
#include <vector>

QUILL_BEGIN_NAMESPACE

namespace detail
{
template <typename T>
class BoundedTransitEventBufferImpl

class TransitEventBuffer
{
public:
using integer_type = T;
explicit TransitEventBuffer(size_t initial_capacity)
: _capacity(next_power_of_two(initial_capacity)),
_storage(std::make_unique<TransitEvent[]>(_capacity)),
_mask(_capacity - 1u)
{
}

TransitEventBuffer(TransitEventBuffer const&) = delete;
TransitEventBuffer& operator=(TransitEventBuffer const&) = delete;

/***/
explicit BoundedTransitEventBufferImpl(integer_type capacity)
: _capacity(next_power_of_two(capacity)), _mask(static_cast<integer_type>(_capacity - 1u))
// Move constructor
TransitEventBuffer(TransitEventBuffer&& other) noexcept
: _capacity(other._capacity),
_storage(std::move(other._storage)),
_mask(other._mask),
_reader_pos(other._reader_pos),
_writer_pos(other._writer_pos)
{
_storage.resize(_capacity);
other._capacity = 0;
other._mask = 0;
other._reader_pos = 0;
other._writer_pos = 0;
}

/***/
BoundedTransitEventBufferImpl(BoundedTransitEventBufferImpl const&) = delete;
BoundedTransitEventBufferImpl& operator=(BoundedTransitEventBufferImpl const&) = delete;
// Move assignment operator
TransitEventBuffer& operator=(TransitEventBuffer&& other) noexcept
{
if (this != &other)
{
_capacity = other._capacity;
_storage = std::move(other._storage);
_mask = other._mask;
_reader_pos = other._reader_pos;
_writer_pos = other._writer_pos;

other._capacity = 0;
other._mask = 0;
other._reader_pos = 0;
other._writer_pos = 0;
}
return *this;
}

/***/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT TransitEvent* front() noexcept
{
if (_writer_pos == _reader_pos)
if (_reader_pos == _writer_pos)
{
// empty
return nullptr;
}

return &_storage[static_cast<uint32_t>(_reader_pos & _mask)];
return &_storage[_reader_pos & _mask];
}

/***/
QUILL_ATTRIBUTE_HOT void pop_front() noexcept { ++_reader_pos; }

/***/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT TransitEvent* back() noexcept
{
if (_capacity - size() == 0)
if (_capacity == size())
{
// is full
return nullptr;
// Buffer is full, need to expand
_expand();
}

return &_storage[static_cast<uint32_t>(_writer_pos & _mask)];
return &_storage[_writer_pos & _mask];
}

/***/
QUILL_ATTRIBUTE_HOT void push_back() noexcept { ++_writer_pos; }

/***/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT integer_type size() const noexcept
{
return static_cast<integer_type>(_writer_pos - _reader_pos);
}

/***/
QUILL_NODISCARD integer_type capacity() const noexcept
{
return static_cast<integer_type>(_capacity);
}

private:
integer_type const _capacity;
integer_type const _mask;
integer_type _reader_pos{0};
integer_type _writer_pos{0};
std::vector<TransitEvent> _storage;
};

using BoundedTransitEventBuffer = BoundedTransitEventBufferImpl<uint32_t>;

class UnboundedTransitEventBuffer
{
public:
/***/
struct Node
{
/**
* Constructor
* @param transit_buffer_capacity the capacity of the fixed buffer
*/
explicit Node(uint32_t transit_buffer_capacity) : transit_buffer(transit_buffer_capacity) {}

/** members */
Node* next{nullptr};
BoundedTransitEventBuffer transit_buffer;
};

/***/
explicit UnboundedTransitEventBuffer(uint32_t initial_transit_buffer_capacity)
: _writer(new Node(initial_transit_buffer_capacity)), _reader(_writer)
{
}

/***/
UnboundedTransitEventBuffer(UnboundedTransitEventBuffer const&) = delete;
UnboundedTransitEventBuffer& operator=(UnboundedTransitEventBuffer const&) = delete;

/***/
~UnboundedTransitEventBuffer()
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t size() const noexcept
{
Node const* reader_node = _reader;

// Look for extra nodes to delete
while (reader_node)
{
auto const to_delete = reader_node;
reader_node = reader_node->next;
delete to_delete;
}
return _writer_pos - _reader_pos;
}

/***/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT TransitEvent* front() noexcept
{
TransitEvent* next_event = _reader->transit_buffer.front();

if (!next_event)
{
// the buffer is empty check if another buffer exists
if (QUILL_UNLIKELY(_reader->next != nullptr))
{
// a new buffer was added by the producer, this happens only when we have allocated a new queue

// switch to the new buffer, existing one is deleted
Node* next_node = _reader->next;
delete _reader;
_reader = next_node;
next_event = _reader->transit_buffer.front();
}
}

return next_event;
}
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t capacity() const noexcept { return _capacity; }

/***/
QUILL_ATTRIBUTE_HOT void pop_front() noexcept { _reader->transit_buffer.pop_front(); }

/***/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT TransitEvent* back() noexcept
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
{
// Try to reserve the bounded queue
TransitEvent* write_event = _writer->transit_buffer.back();

if (QUILL_UNLIKELY(write_event == nullptr))
{
// buffer doesn't have enough space
uint64_t capacity = static_cast<uint64_t>(_writer->transit_buffer.capacity()) * 2ull;
uint64_t constexpr max_bounded_queue_capacity =
(std::numeric_limits<BoundedTransitEventBuffer::integer_type>::max() >> 1) + 1;

if (QUILL_UNLIKELY(capacity > max_bounded_queue_capacity))
{
capacity = max_bounded_queue_capacity;
}

auto const new_node = new Node{static_cast<uint32_t>(capacity)};
_writer->next = new_node;
_writer = _writer->next;
write_event = _writer->transit_buffer.back();
}

assert(write_event && "Write event is always true");

return write_event;
return _reader_pos == _writer_pos;
}

/***/
QUILL_ATTRIBUTE_HOT void push_back() noexcept { _writer->transit_buffer.push_back(); }

/***/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT uint32_t size() const noexcept
private:
void _expand()
{
Node const* reader = _reader;
size_t const new_capacity = _capacity * 2;

uint32_t size = reader->transit_buffer.size();
auto new_storage = std::make_unique<TransitEvent[]>(new_capacity);

while (reader->next)
// Copy existing elements to the new storage
size_t const current_size = size();
for (size_t i = 0; i < current_size; ++i)
{
reader = reader->next;
size += reader->transit_buffer.size();
new_storage[i] = std::move(_storage[(_reader_pos + i) & _mask]);
}

return size;
_storage = std::move(new_storage);
_capacity = new_capacity;
_mask = _capacity - 1;
_writer_pos = current_size;
_reader_pos = 0;
}

/***/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() noexcept { return front() ? false : true; }

private:
Node* _writer{nullptr};
Node* _reader{nullptr};
size_t _capacity;
std::unique_ptr<TransitEvent[]> _storage;
size_t _mask;
size_t _reader_pos{0};
size_t _writer_pos{0};
};

} // namespace detail

QUILL_END_NAMESPACE
4 changes: 2 additions & 2 deletions include/quill/core/ThreadContextManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ QUILL_BEGIN_NAMESPACE
namespace detail
{
/** Forward Declarations **/
class UnboundedTransitEventBuffer;
class TransitEventBuffer;
class BackendWorker;

class ThreadContext
Expand Down Expand Up @@ -202,7 +202,7 @@ class ThreadContext
SizeCacheVector _conditional_arg_size_cache; /**< cache for storing sizes needed for specific operations, such as when calling `strn` functions or when a loop is required e.g. caching the size of a type */
std::string _thread_id = std::to_string(get_thread_id()); /**< cached thread pid */
std::string _thread_name = get_thread_name(); /**< cached thread name */
std::shared_ptr<UnboundedTransitEventBuffer> _transit_event_buffer; /**< backend thread buffer. this could be unique_ptr but it is shared_ptr because of the forward declaration */
std::shared_ptr<TransitEventBuffer> _transit_event_buffer; /**< backend thread buffer. this could be unique_ptr but it is shared_ptr because of the forward declaration */
QueueType _queue_type;
std::atomic<bool> _valid{true}; /**< is this context valid, set by the frontend, read by the backend thread */
alignas(CACHE_LINE_ALIGNED) std::atomic<size_t> _failure_counter{0};
Expand Down
Loading

0 comments on commit 0c5a27b

Please sign in to comment.