From 8a7636724f8e89f673780294cb5c1be46fc67e21 Mon Sep 17 00:00:00 2001 From: sakno Date: Mon, 15 Apr 2024 17:23:16 +0300 Subject: [PATCH] Fixed incorrect I/O --- .../Raft/PersistentState.Partition.cs | 121 ++++++++++-------- 1 file changed, 65 insertions(+), 56 deletions(-) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs index f861f765f..e2d14acd7 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs @@ -20,6 +20,7 @@ private protected abstract class Partition : ConcurrentStorageAccess internal readonly long FirstIndex, PartitionNumber, LastIndex; private Partition? previous, next; protected MemoryOwner entryCache; + protected int runningIndex; protected Partition(DirectoryInfo location, int offset, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize, FileAttributes attributes = FileAttributes.NotContentIndexed) : base(Path.Combine(location.FullName, partitionNumber.ToString(InvariantCulture)), offset, bufferSize, manager.BufferAllocator, readersCount, writeMode, initialSize, attributes) @@ -142,14 +143,16 @@ internal ValueTask PersistCachedEntryAsync(long absoluteIndex, bool removeFromMe ? ValueTask.CompletedTask : removeFromMemory ? PersistAndDeleteAsync(cachedEntry.Content.Memory, index, offset) - : PersistAsync(cachedEntry.Content.Memory, offset); + : PersistAsync(cachedEntry.Content.Memory, index, offset); } [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - private async ValueTask PersistAsync(ReadOnlyMemory content, long offset) + private async ValueTask PersistAsync(ReadOnlyMemory content, int index, long offset) { await SetWritePositionAsync(offset).ConfigureAwait(false); await writer.WriteAsync(content).ConfigureAwait(false); + + runningIndex = index; } [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] @@ -165,6 +168,8 @@ private async ValueTask PersistAndDeleteAsync(ReadOnlyMemory content, int { entryCache[index].Dispose(); } + + runningIndex = index; } internal long GetTerm(long absoluteIndex) @@ -213,13 +218,13 @@ internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, Cancella // Perf: we can skip FileWriter internal buffer and write cached log entry directly to the disk switch (cachedEntry.PersistenceMode) { - case CachedLogEntryPersistenceMode.CopyToBuffer: - goto exit; - case CachedLogEntryPersistenceMode.SkipBuffer: + case CachedLogEntryPersistenceMode.SkipBuffer when !writer.HasBufferedData: return WriteThroughAsync(cachedEntry, relativeIndex, token); - default: + case CachedLogEntryPersistenceMode.None: OnCached(in cachedEntry, relativeIndex); return ValueTask.CompletedTask; + default: + goto exit; } } @@ -331,6 +336,8 @@ protected override async ValueTask PersistAsync(TEntry entry, int index, [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token) { + Debug.Assert(writer.HasBufferedData is false); + if (entry.Length > maxLogEntrySize) throw new InvalidOperationException(ExceptionMessages.LogEntryPayloadTooLarge); @@ -341,6 +348,8 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i await RandomAccess.WriteAsync(Handle, this, GetMetadataOffset(index), token).ConfigureAwait(false); payloadBuffer = default; metadataTable[index].Metadata = metadata; + + writer.FilePosition = metadata.End; } ReadOnlyMemory IReadOnlyList>.this[int index] => index switch @@ -507,32 +516,14 @@ protected override async ValueTask PersistAsync(TEntry entry, int index, } metadata.Format(GetMetadataSpan(index)); - - if (index == LastIndex) - { - // write footer with metadata table - await RandomAccess.WriteAsync(Handle, footer.Memory, metadata.End, token).ConfigureAwait(false); - RandomAccess.SetLength(Handle, metadata.End + footer.Length); - - // seal the partition - IsSealed = true; - } - else if (IsSealed) - { - // unseal - IsSealed = false; - } - else - { - return; - } - - await RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token).ConfigureAwait(false); + runningIndex = index; } [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token) { + Debug.Assert(writer.HasBufferedData is false); + var writeAddress = GetWriteAddress(index); var startPos = writeAddress + LogEntryMetadata.Size; var metadata = LogEntryMetadata.Create(entry, startPos, entry.Length); @@ -543,27 +534,9 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i payloadBuffer = default; metadata.Format(GetMetadataSpan(index)); + runningIndex = index; - if (index == LastIndex) - { - // write footer with metadata table - await RandomAccess.WriteAsync(Handle, footer.Memory, metadata.End, token).ConfigureAwait(false); - RandomAccess.SetLength(Handle, metadata.End + footer.Length); - - // seal the partition - IsSealed = true; - } - else if (IsSealed) - { - // unseal - IsSealed = false; - } - else - { - return; - } - - await RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token).ConfigureAwait(false); + writer.FilePosition = metadata.End; } protected override void OnCached(in CachedLogEntry cachedEntry, int index) @@ -573,6 +546,45 @@ protected override void OnCached(in CachedLogEntry cachedEntry, int index) metadata.Format(GetMetadataSpan(index)); } + public override ValueTask FlushAsync(CancellationToken token = default) + { + return runningIndex == LastIndex + ? FlushAndSealAsync(token) + : IsSealed + ? FlushAndUnsealAsync(token) + : base.FlushAsync(token); + } + + private async ValueTask FlushAndSealAsync(CancellationToken token) + { + await writer.WriteAsync(token).ConfigureAwait(false); + + // write footer with metadata table + await WriteFooterAsync(token).ConfigureAwait(false); + RandomAccess.SetLength(Handle, writer.FilePosition + footer.Length); + + IsSealed = true; + await WriteHeaderAsync(token).ConfigureAwait(false); + + writer.FlushToDisk(); + } + + private async ValueTask FlushAndUnsealAsync(CancellationToken token) + { + await writer.WriteAsync(token).ConfigureAwait(false); + + IsSealed = false; + await WriteHeaderAsync(token).ConfigureAwait(false); + + writer.FlushToDisk(); + } + + private ValueTask WriteHeaderAsync(CancellationToken token) + => RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token); + + private ValueTask WriteFooterAsync(CancellationToken token) + => RandomAccess.WriteAsync(Handle, footer.Memory, writer.FilePosition, token); + ReadOnlyMemory IReadOnlyList>.this[int index] => index switch { 0 => metadataBuffer.Memory, @@ -655,7 +667,7 @@ private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? par Debug.Assert(FirstPartition is null); Debug.Assert(partition is null); FirstPartition = LastPartition = partition = CreatePartition(partitionNumber); - goto exit; + return; } Debug.Assert(FirstPartition is not null); @@ -669,14 +681,14 @@ private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? par if (previous < 0) { partition = Append(partitionNumber, partition); - goto exit; + return; } // nothing on the right side, create new tail if (partition.IsLast) { LastPartition = partition = Append(partitionNumber, partition); - goto exit; + return; } partition = partition.Next; @@ -685,28 +697,25 @@ private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? par if (previous > 0) { partition = Prepend(partitionNumber, partition); - goto exit; + return; } // nothing on the left side, create new head if (partition.IsFirst) { FirstPartition = partition = Prepend(partitionNumber, partition); - goto exit; + return; } partition = partition.Previous; break; default: - goto exit; + return; } Debug.Assert(partition is not null); } - exit: - return; - Partition Prepend(long partitionNumber, Partition partition) { var tmp = CreatePartition(partitionNumber);