From 292ea4142d21547a2d28df877bf1f078630ca275 Mon Sep 17 00:00:00 2001 From: sakno Date: Fri, 5 Jul 2024 10:10:10 +0300 Subject: [PATCH] Align sequential compaction scope to the partition border (#244) --- .../Raft/MemoryBasedStateMachineTests.cs | 24 +++++++++++------ .../Consensus/Raft/MemoryBasedStateMachine.cs | 26 +++++++++++++++++-- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs index 06021eb7b..3e24fe258 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs @@ -670,12 +670,20 @@ public static async Task SequentialCompaction(bool useCaching) Equal(entries.Length + 41L, state.Value); checker = static (readResult, snapshotIndex, token) => { - Single(readResult); - Equal(9, snapshotIndex); - True(readResult[0].IsSnapshot); + Equal(8, snapshotIndex); + True(Single(readResult).IsSnapshot); return default; }; await state.As().ReadAsync(new LogEntryConsumer(checker), 1, 6, CancellationToken.None); + + checker = static (readResult, snapshotIndex, token) => + { + NotEmpty(readResult); + Equal(8, snapshotIndex); + True(readResult[0].IsSnapshot); + False(readResult[1].IsSnapshot); + return default; + }; await state.As().ReadAsync(new LogEntryConsumer(checker), 1, CancellationToken.None); } @@ -692,8 +700,8 @@ public static async Task SequentialCompaction(bool useCaching) Equal(0L, state.Value); checker = static (readResult, snapshotIndex, token) => { - Single(readResult); - Equal(9, snapshotIndex); + NotEmpty(readResult); + Equal(8, snapshotIndex); return default; }; await state.As().ReadAsync(new LogEntryConsumer(checker), 1, CancellationToken.None); @@ -1103,7 +1111,7 @@ public static async Task RegressionIssue244() static async (entries, snapshotIndex, token) => { NotNull(snapshotIndex); - Equal(snapshotIndex, 5L); + Equal(snapshotIndex, 3L); var entry = entries[0]; var snapshot = await entry.ToByteArrayAsync(token: token); @@ -1121,9 +1129,9 @@ static async (entries, snapshotIndex, token) => { // install snapshot await state.AppendAsync(snapshot, snapshotIndex); - Equal(5L, state.LastCommittedEntryIndex); + Equal(3L, state.LastCommittedEntryIndex); - await state.AppendAsync(new Int64LogEntry { Content = 10L, Term = 20L }, 6L); + await state.AppendAsync(new Int64LogEntry { Content = 10L, Term = 20L }, 4L); } } } \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs index 65010aae2..272e22b36 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs @@ -41,6 +41,7 @@ public abstract partial class MemoryBasedStateMachine : PersistentState private readonly CompactionMode compaction; private readonly bool replayOnInitialize, evictOnCommit; private readonly int snapshotBufferSize; + private readonly unsafe delegate* getAlignedIndex; private long lastTerm; // term of last committed entry, volatile @@ -68,6 +69,20 @@ protected MemoryBasedStateMachine(DirectoryInfo path, int recordsPerPartition, O && configuration.UseCaching; snapshot = new(path, snapshotBufferSize, in bufferManager, concurrentReads, configuration.WriteMode, initialSize: configuration.InitialPartitionSize); + + unsafe + { + getAlignedIndex = int.IsPow2(recordsPerPartition) + ? &GetAlignedIndexFast + : &GetAlignedIndexSlow; + } + + [MethodImpl(MethodImplOptions.AggressiveOptimization)] + static long GetAlignedIndexSlow(long absoluteIndex, int recordsPerPartition) + => absoluteIndex / (uint)recordsPerPartition * (uint)recordsPerPartition; + + static long GetAlignedIndexFast(long absoluteIndex, int recordsPerPartition) + => absoluteIndex & -recordsPerPartition; } /// @@ -126,7 +141,7 @@ private static ValueTask ApplyIfNotEmptyAsync(SnapshotBuilder builder, LogEntry [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool IsCompactionRequired(long upperBoundIndex) - => upperBoundIndex - SnapshotInfo.Index > recordsPerPartition; + => upperBoundIndex - SnapshotInfo.Index >= recordsPerPartition; // In case of background compaction we need to have 1 fully committed partition as a divider // between partitions produced during writes and partitions to be compacted. @@ -314,8 +329,9 @@ private protected sealed override async ValueTask AppendAndCommitAsync))] private async Task CommitAndCompactSequentiallyAsync(int session, long commitIndex, CancellationToken token) { - Partition? removedHead; await ApplyAsync(session, token).ConfigureAwait(false); + + Partition? removedHead; if (IsCompactionRequired(commitIndex)) { await ForceSequentialCompactionAsync(session, commitIndex, token).ConfigureAwait(false); @@ -407,6 +423,7 @@ private async ValueTask CommitAndCompactSequentiallyAsync(long? endIndex, LastCommittedEntryIndex = commitIndex; await ApplyAsync(session, token).ConfigureAwait(false); + InternalStateScope scope; if (IsCompactionRequired(commitIndex)) { @@ -552,6 +569,11 @@ private async ValueTask CommitAndCompactIncrementallyAsync(long? endIndex, private async ValueTask ForceSequentialCompactionAsync(int sessionId, long upperBoundIndex, CancellationToken token) { + unsafe + { + upperBoundIndex = getAlignedIndex(upperBoundIndex, recordsPerPartition) - 1L; + } + using var builder = CreateSnapshotBuilder(); await BuildSnapshotAsync(sessionId, upperBoundIndex, builder, token).ConfigureAwait(false);