From 38e83958fd200d60d67537afeadbf936c8b3d948 Mon Sep 17 00:00:00 2001 From: sakno Date: Fri, 5 Jul 2024 10:31:57 +0300 Subject: [PATCH] Fixed #244 --- .../Raft/MemoryBasedStateMachineTests.cs | 6 +++--- .../Raft/PersistentState.Partition.cs | 20 ++++++++++++------- .../Cluster/Consensus/Raft/PersistentState.cs | 12 +++++++---- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs index 3e24fe258..ed79a457a 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs @@ -670,7 +670,7 @@ public static async Task SequentialCompaction(bool useCaching) Equal(entries.Length + 41L, state.Value); checker = static (readResult, snapshotIndex, token) => { - Equal(8, snapshotIndex); + Equal(7, snapshotIndex); True(Single(readResult).IsSnapshot); return default; }; @@ -679,7 +679,7 @@ public static async Task SequentialCompaction(bool useCaching) checker = static (readResult, snapshotIndex, token) => { NotEmpty(readResult); - Equal(8, snapshotIndex); + Equal(7, snapshotIndex); True(readResult[0].IsSnapshot); False(readResult[1].IsSnapshot); return default; @@ -701,7 +701,7 @@ public static async Task SequentialCompaction(bool useCaching) checker = static (readResult, snapshotIndex, token) => { NotEmpty(readResult); - Equal(8, snapshotIndex); + Equal(7, snapshotIndex); return default; }; await state.As().ReadAsync(new LogEntryConsumer(checker), 1, CancellationToken.None); diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs index f7aa29e2f..4ab08dfb8 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs @@ -394,22 +394,28 @@ private sealed class Table : Partition, IReadOnlyList> private MemoryOwner header, footer; private (ReadOnlyMemory, ReadOnlyMemory) bufferTuple; - internal Table(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize) + internal Table(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize, ReadOnlySpan initialFooter) : base(location, HeaderSize, bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize) { footer = manager.BufferAllocator.AllocateExactly(recordsPerPartition * LogEntryMetadata.Size); -#if DEBUG - footer.Span.Clear(); -#endif + initialFooter.CopyTo(footer.Span); header = manager.BufferAllocator.AllocateExactly(HeaderSize); header.Span.Clear(); + } + + internal static MemoryOwner AllocateInitializedFooter(MemoryAllocator allocator, int recordsPerPartition) + { + var footer = allocator.AllocateExactly(recordsPerPartition * LogEntryMetadata.Size); - // init ephemeral 0 entry - if (PartitionNumber is 0L) + for (var index = 0; index < recordsPerPartition; index++) { - LogEntryMetadata.Create(LogEntry.Initial, HeaderSize + LogEntryMetadata.Size, length: 0L).Format(footer.Span); + var writeAddress = index * LogEntryMetadata.Size; + var metadata = new LogEntryMetadata(default, 0L, writeAddress + HeaderSize + LogEntryMetadata.Size, 0L); + metadata.Format(footer.Span.Slice(writeAddress)); } + + return footer; } private bool IsSealed diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs index 761466d49..044978113 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs @@ -30,6 +30,7 @@ public abstract partial class PersistentState : Disposable, IPersistentState private protected readonly WriteMode writeMode; private readonly bool parallelIO; private readonly long maxLogEntrySize; // 0 - modern partition, > 0 - sparse partition, < 0 - legacy partition + private MemoryOwner initializedFooter; static PersistentState() { @@ -86,6 +87,7 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O maxLogEntrySize); break; case 0L: + initializedFooter = Table.AllocateInitializedFooter(bufferManager.BufferAllocator, recordsPerPartition); CreateTables( partitionTable, path, @@ -95,7 +97,8 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O concurrentReads, writeMode, initialSize, - state.LastIndex); + state.LastIndex, + initializedFooter.Span); break; case < 0L: #pragma warning disable CS0618,CS0612 @@ -133,13 +136,13 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O static int ComparePartitions(Partition x, Partition y) => x.PartitionNumber.CompareTo(y.PartitionNumber); - static void CreateTables(SortedSet partitionTable, DirectoryInfo path, int bufferSize, int recordsPerPartition, in BufferManager manager, int concurrentReads, WriteMode writeMode, long initialSize, long lastIndex) + static void CreateTables(SortedSet partitionTable, DirectoryInfo path, int bufferSize, int recordsPerPartition, in BufferManager manager, int concurrentReads, WriteMode writeMode, long initialSize, long lastIndex, ReadOnlySpan initializedFooter) { foreach (var file in path.EnumerateFiles()) { if (long.TryParse(file.Name, out var partitionNumber)) { - var partition = new Table(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in manager, concurrentReads, writeMode, initialSize); + var partition = new Table(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in manager, concurrentReads, writeMode, initialSize, initializedFooter); partition.Initialize(lastIndex); partitionTable.Add(partition); } @@ -187,7 +190,7 @@ static void CreateLegacyPartitions(SortedSet partitionTable, Director private partial Partition CreatePartition(long partitionNumber) => maxLogEntrySize switch { > 0L => new SparsePartition(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize, maxLogEntrySize), - 0L => new Table(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize), + 0L => new Table(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize, initializedFooter.Span), #pragma warning disable CS0618,CS0612 < 0L => new LegacyPartition(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize), #pragma warning restore CS0618,CS0612 @@ -1031,6 +1034,7 @@ protected override void Dispose(bool disposing) commitEvent.Dispose(); syncRoot.Dispose(); bufferingConsumer?.Clear(); + initializedFooter.Dispose(); } base.Dispose(disposing);