Skip to content

Commit

Permalink
Align sequential compaction scope to the partition border (#244)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jul 5, 2024
1 parent d1ff53e commit 292ea41
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -670,12 +670,20 @@ public static async Task SequentialCompaction(bool useCaching)
Equal(entries.Length + 41L, state.Value);
checker = static (readResult, snapshotIndex, token) =>
{
Single(readResult);
Equal(9, snapshotIndex);
True(readResult[0].IsSnapshot);
Equal(8, snapshotIndex);
True(Single(readResult).IsSnapshot);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, 6, CancellationToken.None);

checker = static (readResult, snapshotIndex, token) =>
{
NotEmpty(readResult);
Equal(8, snapshotIndex);
True(readResult[0].IsSnapshot);
False(readResult[1].IsSnapshot);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, CancellationToken.None);
}

Expand All @@ -692,8 +700,8 @@ public static async Task SequentialCompaction(bool useCaching)
Equal(0L, state.Value);
checker = static (readResult, snapshotIndex, token) =>
{
Single(readResult);
Equal(9, snapshotIndex);
NotEmpty(readResult);
Equal(8, snapshotIndex);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, CancellationToken.None);
Expand Down Expand Up @@ -1103,7 +1111,7 @@ public static async Task RegressionIssue244()
static async (entries, snapshotIndex, token) =>
{
NotNull(snapshotIndex);
Equal(snapshotIndex, 5L);
Equal(snapshotIndex, 3L);

var entry = entries[0];
var snapshot = await entry.ToByteArrayAsync(token: token);
Expand All @@ -1121,9 +1129,9 @@ static async (entries, snapshotIndex, token) =>
{
// install snapshot
await state.AppendAsync(snapshot, snapshotIndex);
Equal(5L, state.LastCommittedEntryIndex);
Equal(3L, state.LastCommittedEntryIndex);

await state.AppendAsync(new Int64LogEntry { Content = 10L, Term = 20L }, 6L);
await state.AppendAsync(new Int64LogEntry { Content = 10L, Term = 20L }, 4L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract partial class MemoryBasedStateMachine : PersistentState
private readonly CompactionMode compaction;
private readonly bool replayOnInitialize, evictOnCommit;
private readonly int snapshotBufferSize;
private readonly unsafe delegate*<long, int, long> getAlignedIndex;

private long lastTerm; // term of last committed entry, volatile

Expand Down Expand Up @@ -68,6 +69,20 @@ protected MemoryBasedStateMachine(DirectoryInfo path, int recordsPerPartition, O
&& configuration.UseCaching;
snapshot = new(path, snapshotBufferSize, in bufferManager, concurrentReads, configuration.WriteMode,
initialSize: configuration.InitialPartitionSize);

unsafe
{
getAlignedIndex = int.IsPow2(recordsPerPartition)
? &GetAlignedIndexFast
: &GetAlignedIndexSlow;
}

[MethodImpl(MethodImplOptions.AggressiveOptimization)]
static long GetAlignedIndexSlow(long absoluteIndex, int recordsPerPartition)
=> absoluteIndex / (uint)recordsPerPartition * (uint)recordsPerPartition;

static long GetAlignedIndexFast(long absoluteIndex, int recordsPerPartition)
=> absoluteIndex & -recordsPerPartition;
}

/// <summary>
Expand Down Expand Up @@ -126,7 +141,7 @@ private static ValueTask ApplyIfNotEmptyAsync(SnapshotBuilder builder, LogEntry

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IsCompactionRequired(long upperBoundIndex)
=> upperBoundIndex - SnapshotInfo.Index > recordsPerPartition;
=> upperBoundIndex - SnapshotInfo.Index >= recordsPerPartition;

// In case of background compaction we need to have 1 fully committed partition as a divider
// between partitions produced during writes and partitions to be compacted.
Expand Down Expand Up @@ -314,8 +329,9 @@ private protected sealed override async ValueTask<long> AppendAndCommitAsync<TEn
[AsyncMethodBuilder(typeof(SpawningAsyncTaskMethodBuilder<>))]
private async Task<Partition?> CommitAndCompactSequentiallyAsync(int session, long commitIndex, CancellationToken token)
{
Partition? removedHead;
await ApplyAsync(session, token).ConfigureAwait(false);

Partition? removedHead;
if (IsCompactionRequired(commitIndex))
{
await ForceSequentialCompactionAsync(session, commitIndex, token).ConfigureAwait(false);
Expand Down Expand Up @@ -407,6 +423,7 @@ private async ValueTask<long> CommitAndCompactSequentiallyAsync(long? endIndex,

LastCommittedEntryIndex = commitIndex;
await ApplyAsync(session, token).ConfigureAwait(false);

InternalStateScope scope;
if (IsCompactionRequired(commitIndex))
{
Expand Down Expand Up @@ -552,6 +569,11 @@ private async ValueTask<long> CommitAndCompactIncrementallyAsync(long? endIndex,

private async ValueTask ForceSequentialCompactionAsync(int sessionId, long upperBoundIndex, CancellationToken token)
{
unsafe
{
upperBoundIndex = getAlignedIndex(upperBoundIndex, recordsPerPartition) - 1L;
}

using var builder = CreateSnapshotBuilder();
await BuildSnapshotAsync(sessionId, upperBoundIndex, builder, token).ConfigureAwait(false);

Expand Down

0 comments on commit 292ea41

Please sign in to comment.