-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Log compaction for hybrid state machines #440
Comments
Thinking about use cases and motivations other than saving space. For example message priorities can be implemented like this:
Or Delayed messages:
To generalize Index (Metadata) is in Ra State, Messages in Log (as they are now). Returning all live can be costly then. Maybe returning dead is also an option? Thoughts? |
Also wonder if TTL handling in QQs can be improved? Like index kept and messages removed proactively: |
For what it's worth moving data around in rabbit_msg_store worked very well for CQs. It moves data from the end toward the beginning of the file, in holes created by no longer needed data. It then takes special care to not truncate before the existing readers are done reading from the end of the file. This approach allows rabbit_msg_store to have lock-free readers while still compacting when appropriate. Not sure how that can apply to QQs / ra but the cache invalidation was not an issue compared to the lock the code used before. |
Log compaction for hybrid state machines
Overview
A hybrid state machine is a state machine that keeps some of its state in memory
and some of its state, typically payload data, in the Raft log. RabbitMQ quorum
queues is an example of a hybrid state machine that relies on fifo-ish consumption
patterns in order to use standard (ish) snapshotting and log truncation.
Combined with the checkpoint feature which allows the state machine to store
a snapshot of its (typically light) state on disk without truncating the log
at that index we can consider additional options for reducing the data stored
by the log without a full truncation. Checkpoints allow hybrid state machines to
recover quickly after a restart as they do not need to restore the state from
the last snapshot, only from the last checkpoint.
This means that the log between last snapshot and the latest checkpoint will,
during normal operation, only be read on demand to read a
payload value. As we will never need every single entry in this section of
the log to deterministically recover the log this part of the log can thus be
compacted towards the state where it only contain indexes with payload data
that is currently referenced by the latest checkpoint.
A new
ra_machine
callback that returns a map of all live indexes will be usedfor compaction calculations.
The log before the last checkpoint will be referred to as the "compacting area"
and the log after the last snapshot/checkpoint will be referred to as the "active
log". The compacting area of the log is effectively a k/v store where dead
entries eventually will be deleted and space reclaimed.
Another way to look at it is that the snapshot is the checkpoint + all live
entries in the compacting area. 🤯
Traditional snapshotting (that includes the full state of the state machine)
can be achieved by writing a checkpoint and return no
live indexes from the new
ra_machine
callback.With these generalisations in mind there will be no checkpoints,
at least not multiple ones as in the current implementation. Instead there will
be a new definition of a snapshot as a checkpoint + live entries in the compacting
area of the log.
A compacting log would not take "traditional" snapshots by emitting release_cursors,
only checkpoints would be written.
Checkpoints are taken by the Ra process based on entries, time and log size,
not initiated by the
ra_machine
. Once a checkpoint is written and fsyncedprior checkpoints will no longer be needed (as there is no promotion to snapshot
anymore) and thus will also be deleted.
Follower snapshot replication
This new definition of a snapshot as a checkpoint + live entries in the compacting
area of the log complicates the snapshot replication / installation part of the
code as it is no longer just necessary to replicate the latest checkpoint but also
ensure that the follower has all live entires in the compacting area.
The case where a new follower is added and / or
an existing follower will need a full log sync can be extended as follows:
The snapshot sender process that
is spawned could after replicating the checkpoint then continue to replicate all
segments whole to complete the new snapshot state.
After that point normal log replication could take over.
However, a follower that just ended up a bit behind may already have many of the
live entries and only need some replicated. In this case we will need to negotiate
which entries in the compacting area the follower needs. This could be done
by first replicating the checkpoint then the follower replying with the indexes
it needs (by comparing the indexes in the checkpoint with its own local log state)
and then having these replicated on a per entry basis.
To avoid concurrent segment replication and compaction we can move from using
transient processes for this work to a single companion worker process that every
Ra member has that does all compaction and snapshot replication work. Testing
will need to be done to see if this will provide sufficient parallelism.
Impact on segment writer process
The segment writer process is responsible for flushing entries in the mem tables
to segments.
Currently it is designed to skip any entries with indexes lower than the last written
snapshot. As checkpoints can be (and should to reduce write amplifiction) written
before any segments have been written with the new approach it will need to
inspect the latest checkpoint and use the set of live indexes to ensure it
includes all relevant entries but avoid those that are already dead.
Some coordination with the compaction process will no doubt also be needed to
avoid compaction and segment writing occurring at the same time.
Compaction
Compaction should be triggered every time a new checkpoint is written.
It is performed in phases in order of efficiency (most efficient first) until
some configured ratio of live / vs dead entries has been achieved.
This ratio should ideally be byte based but this may require expensive scanning
to work out the total size of all live entries or require the state machine to
somehow track the approximate size of all live entries in addition to their indexes
which may not be practical in all cases. At a first attempt we should a simpler
entry based approach where we allow ratio of dead vs live entries, e.g for
every 100 live entries we would allow 30 dead entries to remain in the log.
Compaction Phases:
contain no live indexes and delete these. This is cheap and effective but may
leave many dead entries in the log. For checkpoints that return no live entries
in the compacting area this becomes a simple cutoff job where all segments that
only contain entries with a lower index than the last checkoint will simple be
deleted.
Because all data portions of a segment file is kept at the end of the file
dead data at the end of the file could be truncated. This is also relatively
cheap as it only requires a truncate system call to reclaim the space.
This option both has high read and write amplification and will transiently consume
more disk space and thus should only be used when necessary. It also requires
coordination with any log readers (such as the main Ra process) to avoid deleting
a segment where all entries have been written to another segment but that the log
readers may want to read.
There is also a 2b option where data within a segment could possibly be moved around
to provide more compaction but moving data is always tricky as it would
invalidate any in memory caches of the segment index that the Ra process may currently
have if it recently read an entry from this segment.
The text was updated successfully, but these errors were encountered: