From 2edc792f69537154b96fdc3ea3b466646d3ea018 Mon Sep 17 00:00:00 2001 From: sakno Date: Mon, 1 Jul 2024 18:48:18 +0300 Subject: [PATCH] Fixed #244 --- .../Raft/MemoryBasedStateMachineTests.cs | 2 +- .../Consensus/Raft/DiskBasedStateMachine.cs | 25 ++--- .../Consensus/Raft/MemoryBasedStateMachine.cs | 35 +++---- .../Consensus/Raft/PersistentState.Cache.cs | 3 +- .../Raft/PersistentState.Internal.cs | 28 ++---- .../Raft/PersistentState.LogEntry.cs | 6 -- .../Raft/PersistentState.Partition.cs | 92 +++---------------- 7 files changed, 50 insertions(+), 141 deletions(-) diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs index bafcca9c0..41ea7660a 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs @@ -1067,8 +1067,8 @@ public static async Task RegressionIssue244() { NotEmpty(entries); Null(snapshotIndex); + Equal(1L, entries[0].Term); Equal(1L, entries[1].Term); - Equal(1L, entries[2].Term); return ValueTask.FromResult(Missing.Value); }; diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs index 420fd2645..b1ec41da9 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs @@ -27,7 +27,7 @@ public abstract partial class DiskBasedStateMachine : PersistentState /// The configuration of the persistent audit trail. /// is less than 2. protected DiskBasedStateMachine(DirectoryInfo path, int recordsPerPartition, Options? configuration = null) - : base(path, recordsPerPartition, configuration ?? new()) + : base(path, recordsPerPartition, configuration ??= new()) { } @@ -64,27 +64,22 @@ protected DiskBasedStateMachine(string path, int recordsPerPartition, Options? c for (Partition? partition = null; startIndex <= commitIndex; LastAppliedEntryIndex = startIndex++, token.ThrowIfCancellationRequested()) { - if (TryGetPartition(startIndex, ref partition)) + if (TryGetPartition(startIndex, ref partition, out var switched)) { + if (switched) + { + await partition.FlushAsync(token).ConfigureAwait(false); + } + var entry = partition.Read(sessionId, startIndex); var snapshotLength = await ApplyCoreAsync(entry).ConfigureAwait(false); Volatile.Write(ref lastTerm, entry.Term); partition.ClearContext(startIndex); - // Remove log entry from the cache according to eviction policy - var completedPartition = startIndex == partition.LastIndex; - if (!entry.IsPersisted) - { - await partition.PersistCachedEntryAsync(startIndex, snapshotLength.HasValue).ConfigureAwait(false); - - // Flush partition if we are finished or at the last entry in it - if (startIndex == commitIndex | completedPartition) - { - await partition.FlushAsync(token).ConfigureAwait(false); - } - } + if (bufferManager.IsCachingEnabled) + partition.Evict(startIndex); - if (completedPartition) + if (startIndex == commitIndex || startIndex == partition.LastIndex) { partition.ClearContext(); } diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs index 26aa05e30..e1016af31 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs @@ -63,8 +63,11 @@ protected MemoryBasedStateMachine(DirectoryInfo path, int recordsPerPartition, O snapshotBufferSize = configuration.SnapshotBufferSize; // with concurrent compaction, we will release cached log entries according to partition lifetime - evictOnCommit = compaction is not CompactionMode.Incremental && configuration.CacheEvictionPolicy is LogEntryCacheEvictionPolicy.OnCommit; - snapshot = new(path, snapshotBufferSize, in bufferManager, concurrentReads, configuration.WriteMode, initialSize: configuration.InitialPartitionSize); + evictOnCommit = compaction is not CompactionMode.Incremental + && configuration.CacheEvictionPolicy is LogEntryCacheEvictionPolicy.OnCommit + && configuration.UseCaching; + snapshot = new(path, snapshotBufferSize, in bufferManager, concurrentReads, configuration.WriteMode, + initialSize: configuration.InitialPartitionSize); } /// @@ -93,7 +96,7 @@ private async ValueTask BuildSnapshotAsync(int sessionId, long upperBoundIndex, { // Calculate the term of the snapshot Partition? current = LastPartition; - builder.Term = TryGetPartition(upperBoundIndex, ref current) + builder.Term = TryGetPartition(upperBoundIndex, ref current, out _) ? current.GetTerm(upperBoundIndex) : throw new MissingPartitionException(upperBoundIndex); @@ -113,7 +116,8 @@ private async ValueTask BuildSnapshotAsync(int sessionId, long upperBoundIndex, private bool TryGetPartition(SnapshotBuilder builder, long startIndex, long endIndex, ref long currentIndex, [NotNullWhen(true)] ref Partition? partition) { builder.AdjustIndex(startIndex, endIndex, ref currentIndex); - return currentIndex.IsBetween(startIndex.Enclosed(), endIndex.Enclosed()) && TryGetPartition(currentIndex, ref partition); + return currentIndex.IsBetween(startIndex.Enclosed(), endIndex.Enclosed()) + && TryGetPartition(currentIndex, ref partition, out _); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -687,27 +691,24 @@ private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationT var commitIndex = LastCommittedEntryIndex; for (Partition? partition = null; startIndex <= commitIndex; LastAppliedEntryIndex = startIndex++, token.ThrowIfCancellationRequested()) { - if (TryGetPartition(startIndex, ref partition)) + if (TryGetPartition(startIndex, ref partition, out var switched)) { + if (switched) + { + await partition.FlushAsync(token).ConfigureAwait(false); + } + var entry = partition.Read(sessionId, startIndex); await ApplyCoreAsync(entry).ConfigureAwait(false); Volatile.Write(ref lastTerm, entry.Term); partition.ClearContext(startIndex); - // Remove log entry from the cache according to eviction policy - var completedPartition = startIndex == partition.LastIndex; - if (!entry.IsPersisted) + if (evictOnCommit && bufferManager.IsCachingEnabled) { - await partition.PersistCachedEntryAsync(startIndex, evictOnCommit).ConfigureAwait(false); - - // Flush partition if we are finished or at the last entry in it - if (startIndex == commitIndex | completedPartition) - { - await partition.FlushAsync(token).ConfigureAwait(false); - } + partition.Evict(startIndex); } - if (completedPartition) + if (startIndex == commitIndex || startIndex == partition.LastIndex) { partition.ClearContext(); } @@ -758,7 +759,7 @@ public async Task ReplayAsync(CancellationToken token = default) if (compaction is CompactionMode.Incremental) { incrementalBuilder = await InitializeLongLivingSnapshotBuilderAsync(session).ConfigureAwait(false); - for (Partition? partition = FirstPartition; TryGetPartition(startIndex, ref partition) && partition is not null && startIndex <= LastCommittedEntryIndex; startIndex++) + for (var partition = FirstPartition; TryGetPartition(startIndex, ref partition, out _) && startIndex <= LastCommittedEntryIndex; startIndex++) { entry = partition.Read(session, startIndex); incrementalBuilder.Builder.Term = entry.Term; diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs index 992963b03..c67822543 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs @@ -24,8 +24,7 @@ public void Dispose() internal enum CachedLogEntryPersistenceMode : byte { - None = 0, - CopyToBuffer, + CopyToBuffer = 0, SkipBuffer, } diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs index b2c9bfe22..4020e3688 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs @@ -127,30 +127,18 @@ public void Format(Span output) // fast path without any overhead for LE byte order ref var ptr = ref MemoryMarshal.GetReference(output); - if (!BitConverter.IsLittleEndian) - { - // BE case - FormatSlow(output); - } - else if (IntPtr.Size is sizeof(long)) + if (BitConverter.IsLittleEndian) { - // 64-bit LE case, the pointer is always aligned to 8 bytes - Debug.Assert(Intrinsics.AddressOf(in ptr) % IntPtr.Size is 0); - Unsafe.As(ref ptr) = this; + // 32-bit LE case, the pointer may not be aligned to 8 bytes + Unsafe.WriteUnaligned(ref ptr, this); } else { - // 32-bit LE case, the pointer may not be aligned to 8 bytes - Unsafe.WriteUnaligned(ref ptr, this); + // BE case + FormatSlow(output); } } - internal static long GetTerm(ReadOnlySpan input) - => BinaryPrimitives.ReadInt64LittleEndian(input); - - internal static long GetOffset(ReadOnlySpan input) - => BinaryPrimitives.ReadInt64LittleEndian(input.Slice(0, sizeof(long) + sizeof(long) + sizeof(long))); // skip Term, Timestamp, Length - internal long End => Length + Offset; internal static long GetEndOfLogEntry(ReadOnlySpan input) @@ -388,7 +376,7 @@ internal LogEntryList(PersistentState state, int sessionId, long startIndex, lon StartIndex = startIndex; EndIndex = endIndex; this.metadataOnly = metadataOnly; - if(!state.TryGetPartition(startIndex, ref head)) + if (!state.TryGetPartition(startIndex, ref head, out _)) head = state.FirstPartition; cache = head; @@ -428,7 +416,7 @@ public LogEntry this[int index] Debug.Assert(absoluteIndex <= EndIndex); - return state.TryGetPartition(absoluteIndex, ref cache) + return state.TryGetPartition(absoluteIndex, ref cache, out _) ? cache.Read(SessionId, absoluteIndex, metadataOnly) : throw new MissingPartitionException(absoluteIndex); } @@ -449,7 +437,7 @@ public readonly IEnumerator GetEnumerator() runningIndex += 1L; } - for (Partition? partition = head; runningIndex <= EndIndex && state.TryGetPartition(runningIndex, ref partition); runningIndex++) + for (var partition = head; runningIndex <= EndIndex && state.TryGetPartition(runningIndex, ref partition, out _); runningIndex++) yield return partition.Read(SessionId, runningIndex, metadataOnly); } diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs index 153ebf583..eea8333b1 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs @@ -44,12 +44,6 @@ internal LogEntry(in SnapshotMetadata metadata) index = -metadata.Index; } - internal bool IsPersisted - { - get; - init; - } - /// /// Gets or sets context associated with this log entry. /// diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs index ff135637c..33d687cf8 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs @@ -110,6 +110,8 @@ internal void ClearContext(long absoluteIndex) internal void ClearContext() => context = null; + internal void Evict(long absoluteIndex) => entryCache[ToRelativeIndex(absoluteIndex)].Dispose(); + internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = false) { Debug.Assert(absoluteIndex >= FirstIndex && absoluteIndex <= LastIndex, $"Invalid index value {absoluteIndex}, offset {FirstIndex}"); @@ -130,7 +132,6 @@ internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = fa return new(in metadata, absoluteIndex) { ContentReader = GetSessionReader(sessionId), - IsPersisted = true, Context = GetContext(relativeIndex), }; } @@ -139,7 +140,6 @@ internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = fa return new(in metadata, absoluteIndex) { ContentBuffer = cachedContent.Content.Memory, - IsPersisted = cachedContent.PersistenceMode is not CachedLogEntryPersistenceMode.None, Context = cachedContent.Context, }; @@ -153,51 +153,6 @@ internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = fa } } - internal ValueTask PersistCachedEntryAsync(long absoluteIndex, bool removeFromMemory) - { - Debug.Assert(entryCache.IsEmpty is false); - - var index = ToRelativeIndex(absoluteIndex); - Debug.Assert((uint)index < (uint)entryCache.Length); - - ref var cachedEntry = ref entryCache[index]; - Debug.Assert(cachedEntry.PersistenceMode is CachedLogEntryPersistenceMode.None); - cachedEntry.PersistenceMode = CachedLogEntryPersistenceMode.CopyToBuffer; - var offset = GetMetadata(index).Offset; - - return cachedEntry.Content.IsEmpty - ? ValueTask.CompletedTask - : removeFromMemory - ? PersistAndDeleteAsync(cachedEntry.Content.Memory, index, offset) - : PersistAsync(cachedEntry.Content.Memory, index, offset); - } - - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - private async ValueTask PersistAsync(ReadOnlyMemory content, int index, long offset) - { - await SetWritePositionAsync(offset).ConfigureAwait(false); - await writer.WriteAsync(content).ConfigureAwait(false); - - runningIndex = index; - } - - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - private async ValueTask PersistAndDeleteAsync(ReadOnlyMemory content, int index, long offset) - { - try - { - // manually inlined body of PersistAsync method - await SetWritePositionAsync(offset).ConfigureAwait(false); - await writer.WriteAsync(content).ConfigureAwait(false); - } - finally - { - entryCache[index].Dispose(); - } - - runningIndex = index; - } - internal long GetTerm(long absoluteIndex) { Debug.Assert(absoluteIndex >= FirstIndex && absoluteIndex <= LastIndex, $"Invalid index value {absoluteIndex}, offset {FirstIndex}"); @@ -222,8 +177,6 @@ protected abstract ValueTask PersistAsync(TEntry entry, int index, Cance protected abstract ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token); - protected abstract void OnCached(in CachedLogEntry cachedEntry, int index); - internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, CancellationToken token) where TEntry : notnull, IRaftLogEntry { @@ -239,18 +192,12 @@ internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, Cancella UpdateCache(in cachedEntry, relativeIndex); // Perf: we can skip FileWriter internal buffer and write cached log entry directly to the disk - switch (cachedEntry.PersistenceMode) - { - case CachedLogEntryPersistenceMode.SkipBuffer when !writer.HasBufferedData: - return WriteThroughAsync(cachedEntry, relativeIndex, token); - case CachedLogEntryPersistenceMode.None: - OnCached(in cachedEntry, relativeIndex); - return ValueTask.CompletedTask; - default: - goto exit; - } + return cachedEntry.PersistenceMode is CachedLogEntryPersistenceMode.SkipBuffer && !writer.HasBufferedData + ? WriteThroughAsync(cachedEntry, relativeIndex, token) + : PersistAsync(cachedEntry, relativeIndex, token); } - else if (entry is IInputLogEntry && ((IInputLogEntry)entry).Context is { } context) + + if (entry is IInputLogEntry && ((IInputLogEntry)entry).Context is { } context) { SetContext(relativeIndex, context); } @@ -259,7 +206,6 @@ internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, Cancella if (!entryCache.IsEmpty) entryCache[relativeIndex].Dispose(); - exit: return PersistAsync(entry, relativeIndex, token); void SetContext(int relativeIndex, object context) @@ -329,9 +275,6 @@ protected override LogEntryMetadata GetMetadata(int index) return entry.Metadata; } - protected override void OnCached(in CachedLogEntry cachedEntry, int index) - => metadataTable[index].Metadata = LogEntryMetadata.Create(cachedEntry, GetMetadataOffset(index) + LogEntryMetadata.Size); - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] protected override async ValueTask PersistAsync(TEntry entry, int index, CancellationToken token) { @@ -607,13 +550,6 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i writer.FilePosition = metadata.End; } - protected override void OnCached(in CachedLogEntry cachedEntry, int index) - { - var startPos = GetWriteAddress(index) + LogEntryMetadata.Size; - var metadata = LogEntryMetadata.Create(in cachedEntry, startPos); - metadata.Format(GetMetadataBuffer(index).Span); - } - private ValueTask UnsealIfNeededAsync(long truncatePosition, CancellationToken token) { ValueTask task; @@ -841,12 +777,6 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i writeAddress += entry.Length; } - protected override void OnCached(in CachedLogEntry cachedEntry, int index) - { - WriteMetadata(index, LogEntryMetadata.Create(cachedEntry, writeAddress, cachedEntry.Length)); - writeAddress += cachedEntry.Length; - } - protected override void Dispose(bool disposing) { metadata.Dispose(); @@ -896,7 +826,7 @@ private protected Partition? LastPartition private partial Partition CreatePartition(long partitionNumber); - // during insertion the index is growing monothonically so + // during insertion the index is growing monotonically so // this method is optimized for forward lookup in sorted list of partitions private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? partition) { @@ -1010,9 +940,10 @@ Partition Append(long partitionNumber, Partition partition) return result; } - // during reads the index is growing monothonically - private protected bool TryGetPartition(long recordIndex, [NotNullWhen(true)] ref Partition? partition) + // during reads the index is growing monotonically + private protected bool TryGetPartition(long recordIndex, [NotNullWhen(true)] ref Partition? partition, out bool switched) { + switched = false; if (partition?.Contains(recordIndex) ?? false) goto success; @@ -1028,6 +959,7 @@ private protected bool TryGetPartition(long recordIndex, [NotNullWhen(true)] ref var partitionNumber = PartitionOf(recordIndex); + switched = true; for (int previous = 0, current; ; previous = current) { switch (current = partitionNumber.CompareTo(partition.PartitionNumber))