Skip to content

Commit

Permalink
Fixed #244
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jul 1, 2024
1 parent 9207eb6 commit 2edc792
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,8 @@ public static async Task RegressionIssue244()
{
NotEmpty(entries);
Null(snapshotIndex);
Equal(1L, entries[0].Term);
Equal(1L, entries[1].Term);
Equal(1L, entries[2].Term);
return ValueTask.FromResult(Missing.Value);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract partial class DiskBasedStateMachine : PersistentState
/// <param name="configuration">The configuration of the persistent audit trail.</param>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="recordsPerPartition"/> is less than 2.</exception>
protected DiskBasedStateMachine(DirectoryInfo path, int recordsPerPartition, Options? configuration = null)
: base(path, recordsPerPartition, configuration ?? new())
: base(path, recordsPerPartition, configuration ??= new())
{
}

Expand Down Expand Up @@ -64,27 +64,22 @@ protected DiskBasedStateMachine(string path, int recordsPerPartition, Options? c

for (Partition? partition = null; startIndex <= commitIndex; LastAppliedEntryIndex = startIndex++, token.ThrowIfCancellationRequested())
{
if (TryGetPartition(startIndex, ref partition))
if (TryGetPartition(startIndex, ref partition, out var switched))
{
if (switched)
{
await partition.FlushAsync(token).ConfigureAwait(false);
}

var entry = partition.Read(sessionId, startIndex);
var snapshotLength = await ApplyCoreAsync(entry).ConfigureAwait(false);
Volatile.Write(ref lastTerm, entry.Term);
partition.ClearContext(startIndex);

// Remove log entry from the cache according to eviction policy
var completedPartition = startIndex == partition.LastIndex;
if (!entry.IsPersisted)
{
await partition.PersistCachedEntryAsync(startIndex, snapshotLength.HasValue).ConfigureAwait(false);

// Flush partition if we are finished or at the last entry in it
if (startIndex == commitIndex | completedPartition)
{
await partition.FlushAsync(token).ConfigureAwait(false);
}
}
if (bufferManager.IsCachingEnabled)
partition.Evict(startIndex);

if (completedPartition)
if (startIndex == commitIndex || startIndex == partition.LastIndex)
{
partition.ClearContext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ protected MemoryBasedStateMachine(DirectoryInfo path, int recordsPerPartition, O
snapshotBufferSize = configuration.SnapshotBufferSize;

// with concurrent compaction, we will release cached log entries according to partition lifetime
evictOnCommit = compaction is not CompactionMode.Incremental && configuration.CacheEvictionPolicy is LogEntryCacheEvictionPolicy.OnCommit;
snapshot = new(path, snapshotBufferSize, in bufferManager, concurrentReads, configuration.WriteMode, initialSize: configuration.InitialPartitionSize);
evictOnCommit = compaction is not CompactionMode.Incremental
&& configuration.CacheEvictionPolicy is LogEntryCacheEvictionPolicy.OnCommit
&& configuration.UseCaching;
snapshot = new(path, snapshotBufferSize, in bufferManager, concurrentReads, configuration.WriteMode,
initialSize: configuration.InitialPartitionSize);
}

/// <summary>
Expand Down Expand Up @@ -93,7 +96,7 @@ private async ValueTask BuildSnapshotAsync(int sessionId, long upperBoundIndex,
{
// Calculate the term of the snapshot
Partition? current = LastPartition;
builder.Term = TryGetPartition(upperBoundIndex, ref current)
builder.Term = TryGetPartition(upperBoundIndex, ref current, out _)
? current.GetTerm(upperBoundIndex)
: throw new MissingPartitionException(upperBoundIndex);

Expand All @@ -113,7 +116,8 @@ private async ValueTask BuildSnapshotAsync(int sessionId, long upperBoundIndex,
private bool TryGetPartition(SnapshotBuilder builder, long startIndex, long endIndex, ref long currentIndex, [NotNullWhen(true)] ref Partition? partition)
{
builder.AdjustIndex(startIndex, endIndex, ref currentIndex);
return currentIndex.IsBetween(startIndex.Enclosed(), endIndex.Enclosed()) && TryGetPartition(currentIndex, ref partition);
return currentIndex.IsBetween(startIndex.Enclosed(), endIndex.Enclosed())
&& TryGetPartition(currentIndex, ref partition, out _);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -687,27 +691,24 @@ private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationT
var commitIndex = LastCommittedEntryIndex;
for (Partition? partition = null; startIndex <= commitIndex; LastAppliedEntryIndex = startIndex++, token.ThrowIfCancellationRequested())
{
if (TryGetPartition(startIndex, ref partition))
if (TryGetPartition(startIndex, ref partition, out var switched))
{
if (switched)
{
await partition.FlushAsync(token).ConfigureAwait(false);
}

var entry = partition.Read(sessionId, startIndex);
await ApplyCoreAsync(entry).ConfigureAwait(false);
Volatile.Write(ref lastTerm, entry.Term);
partition.ClearContext(startIndex);

// Remove log entry from the cache according to eviction policy
var completedPartition = startIndex == partition.LastIndex;
if (!entry.IsPersisted)
if (evictOnCommit && bufferManager.IsCachingEnabled)
{
await partition.PersistCachedEntryAsync(startIndex, evictOnCommit).ConfigureAwait(false);

// Flush partition if we are finished or at the last entry in it
if (startIndex == commitIndex | completedPartition)
{
await partition.FlushAsync(token).ConfigureAwait(false);
}
partition.Evict(startIndex);
}

if (completedPartition)
if (startIndex == commitIndex || startIndex == partition.LastIndex)
{
partition.ClearContext();
}
Expand Down Expand Up @@ -758,7 +759,7 @@ public async Task ReplayAsync(CancellationToken token = default)
if (compaction is CompactionMode.Incremental)
{
incrementalBuilder = await InitializeLongLivingSnapshotBuilderAsync(session).ConfigureAwait(false);
for (Partition? partition = FirstPartition; TryGetPartition(startIndex, ref partition) && partition is not null && startIndex <= LastCommittedEntryIndex; startIndex++)
for (var partition = FirstPartition; TryGetPartition(startIndex, ref partition, out _) && startIndex <= LastCommittedEntryIndex; startIndex++)
{
entry = partition.Read(session, startIndex);
incrementalBuilder.Builder.Term = entry.Term;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public void Dispose()

internal enum CachedLogEntryPersistenceMode : byte
{
None = 0,
CopyToBuffer,
CopyToBuffer = 0,
SkipBuffer,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,30 +127,18 @@ public void Format(Span<byte> output)
// fast path without any overhead for LE byte order
ref var ptr = ref MemoryMarshal.GetReference(output);

if (!BitConverter.IsLittleEndian)
{
// BE case
FormatSlow(output);
}
else if (IntPtr.Size is sizeof(long))
if (BitConverter.IsLittleEndian)
{
// 64-bit LE case, the pointer is always aligned to 8 bytes
Debug.Assert(Intrinsics.AddressOf(in ptr) % IntPtr.Size is 0);
Unsafe.As<byte, LogEntryMetadata>(ref ptr) = this;
// 32-bit LE case, the pointer may not be aligned to 8 bytes
Unsafe.WriteUnaligned(ref ptr, this);
}
else
{
// 32-bit LE case, the pointer may not be aligned to 8 bytes
Unsafe.WriteUnaligned(ref ptr, this);
// BE case
FormatSlow(output);
}
}

internal static long GetTerm(ReadOnlySpan<byte> input)
=> BinaryPrimitives.ReadInt64LittleEndian(input);

internal static long GetOffset(ReadOnlySpan<byte> input)
=> BinaryPrimitives.ReadInt64LittleEndian(input.Slice(0, sizeof(long) + sizeof(long) + sizeof(long))); // skip Term, Timestamp, Length

internal long End => Length + Offset;

internal static long GetEndOfLogEntry(ReadOnlySpan<byte> input)
Expand Down Expand Up @@ -388,7 +376,7 @@ internal LogEntryList(PersistentState state, int sessionId, long startIndex, lon
StartIndex = startIndex;
EndIndex = endIndex;
this.metadataOnly = metadataOnly;
if(!state.TryGetPartition(startIndex, ref head))
if (!state.TryGetPartition(startIndex, ref head, out _))
head = state.FirstPartition;

cache = head;
Expand Down Expand Up @@ -428,7 +416,7 @@ public LogEntry this[int index]

Debug.Assert(absoluteIndex <= EndIndex);

return state.TryGetPartition(absoluteIndex, ref cache)
return state.TryGetPartition(absoluteIndex, ref cache, out _)
? cache.Read(SessionId, absoluteIndex, metadataOnly)
: throw new MissingPartitionException(absoluteIndex);
}
Expand All @@ -449,7 +437,7 @@ public readonly IEnumerator<LogEntry> GetEnumerator()
runningIndex += 1L;
}

for (Partition? partition = head; runningIndex <= EndIndex && state.TryGetPartition(runningIndex, ref partition); runningIndex++)
for (var partition = head; runningIndex <= EndIndex && state.TryGetPartition(runningIndex, ref partition, out _); runningIndex++)
yield return partition.Read(SessionId, runningIndex, metadataOnly);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ internal LogEntry(in SnapshotMetadata metadata)
index = -metadata.Index;
}

internal bool IsPersisted
{
get;
init;
}

/// <summary>
/// Gets or sets context associated with this log entry.
/// </summary>
Expand Down
Loading

0 comments on commit 2edc792

Please sign in to comment.