diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs index fc9e3f636..4632a0bba 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs @@ -333,6 +333,34 @@ public static async Task Overwrite(long? maxLogEntrySize) } } + [Fact] + public static async Task OverwriteUnsealed() + { + var entry1 = new TestLogEntry("SET A = 0 SET B=1 SET C=2 SET D=3 SET E=4 SET F=5") { Term = 42L }; + var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L }; + Func, long?, CancellationToken, ValueTask> checker; + var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { UseCaching = false })) + { + Equal(1L, await state.AppendAsync(entry1)); + await state.AppendAsync(entry2, 1L); + } + + //read again + using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { UseCaching = false })) + { + checker = async (entries, snapshotIndex, token) => + { + Null(snapshotIndex); + Single(entries); + False(entries[0].IsSnapshot); + Equal(entry2.Content, await entries[0].ToStringAsync(Encoding.UTF8)); + return Missing.Value; + }; + await state.As().ReadAsync(new LogEntryConsumer(checker), 1L, CancellationToken.None); + } + } + [Obsolete] [Fact] public static async Task LegacyOverwrite() diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs index 526b86783..3ccd30728 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs @@ -505,11 +505,12 @@ internal override void Initialize() if (RandomAccess.Read(Handle, header.Span, fileOffset: 0L) < HeaderSize) { header.Span.Clear(); + writer.FilePosition = HeaderSize; } else if (IsSealed) { // partition is completed, read table - fileOffset = RandomAccess.GetLength(Handle); + writer.FilePosition = fileOffset = RandomAccess.GetLength(Handle); if (fileOffset < footer.Length + HeaderSize) throw new IntegrityException(ExceptionMessages.InvalidPartitionFormat); @@ -533,7 +534,7 @@ internal override void Initialize() fileOffset = HeaderSize; } - for (Span metadataBuffer = stackalloc byte[LogEntryMetadata.Size], metadataTable = footer.Span; ; footerOffset += LogEntryMetadata.Size) + for (Span metadataBuffer = stackalloc byte[LogEntryMetadata.Size], metadataTable = footer.Span; footerOffset < footer.Length; footerOffset += LogEntryMetadata.Size) { var count = RandomAccess.Read(Handle, metadataBuffer, fileOffset); if (count < LogEntryMetadata.Size) @@ -543,6 +544,7 @@ internal override void Initialize() if (fileOffset <= 0L) break; + writer.FilePosition = fileOffset; metadataBuffer.CopyTo(metadataTable.Slice(footerOffset, LogEntryMetadata.Size)); } } @@ -562,6 +564,7 @@ private long GetWriteAddress(int index) protected override async ValueTask PersistAsync(TEntry entry, int index, CancellationToken token) { var writeAddress = GetWriteAddress(index); + await UnsealIfNeededAsync(writeAddress, token).ConfigureAwait(false); LogEntryMetadata metadata; Memory metadataBuffer; @@ -601,6 +604,8 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i Debug.Assert(writer.HasBufferedData is false); var writeAddress = GetWriteAddress(index); + await UnsealIfNeededAsync(writeAddress, token).ConfigureAwait(false); + var startPos = writeAddress + LogEntryMetadata.Size; var metadata = LogEntryMetadata.Create(entry, startPos, entry.Length); var metadataBuffer = GetMetadataBuffer(index); @@ -621,12 +626,59 @@ protected override void OnCached(in CachedLogEntry cachedEntry, int index) metadata.Format(GetMetadataBuffer(index).Span); } + private ValueTask UnsealIfNeededAsync(long truncatePosition, CancellationToken token) + { + ValueTask task; + if (IsSealed) + { + task = UnsealAsync(truncatePosition, token); + } + else if (token.IsCancellationRequested) + { + task = ValueTask.FromCanceled(token); + } + else if (truncatePosition < writer.FilePosition) + { + task = new(); + try + { + // The caller is trying to rewrite the log entry. + // For a correctness of Initialize() method for unsealed partitions, we + // need to adjust file size. This is expensive syscall which can lead to file fragmentation. + // However, this is acceptable because rare. + RandomAccess.SetLength(Handle, truncatePosition); + } + catch (Exception e) + { + task = ValueTask.FromException(e); + } + } + else + { + task = new(); + } + + return task; + } + + private async ValueTask UnsealAsync(long truncatePosition, CancellationToken token) + { + // This is expensive operation in terms of I/O. However, it is needed only when + // the consumer decided to rewrite the existing log entry, which is rare. + IsSealed = false; + await WriteHeaderAsync(token).ConfigureAwait(false); + RandomAccess.FlushToDisk(Handle); + + // destroy all entries in the tail of partition + RandomAccess.SetLength(Handle, truncatePosition); + } + public override ValueTask FlushAsync(CancellationToken token = default) { - return runningIndex == LastIndex + return IsSealed + ? ValueTask.CompletedTask + : runningIndex == LastIndex ? FlushAndSealAsync(token) - : IsSealed - ? FlushAndUnsealAsync(token) : base.FlushAsync(token); } @@ -651,41 +703,7 @@ private async ValueTask FlushAndSealAsync(CancellationToken token) IsSealed = true; await WriteHeaderAsync(token).ConfigureAwait(false); - writer.FlushToDisk(); - } - - private async ValueTask FlushAndUnsealAsync(CancellationToken token) - { - await FlushAndEraseNextEntryAsync(token).ConfigureAwait(false); - - IsSealed = false; - await WriteHeaderAsync(token).ConfigureAwait(false); - - writer.FlushToDisk(); - } - - private ValueTask FlushAndEraseNextEntryAsync(CancellationToken token) - { - ValueTask task; - - // write the rest of the entry, - // then cleanup next entry header to indicate that the current entry is the last entry - if (!writer.HasBufferedData) - { - task = RandomAccess.WriteAsync(Handle, EmptyMetadata, writer.FilePosition, token); - } - else if (writer.Buffer is { Length: >= LogEntryMetadata.Size } emptyMetadataStub) - { - emptyMetadataStub.Span.Slice(0, LogEntryMetadata.Size).Clear(); - writer.Produce(LogEntryMetadata.Size); - task = writer.WriteAsync(token); - } - else - { - task = writer.WriteAsync(EmptyMetadata, token); - } - - return task; + RandomAccess.FlushToDisk(Handle); } private ValueTask WriteHeaderAsync(CancellationToken token)