From b3adac3fcd1c1bdfc3aa1a880d34326bc217e50d Mon Sep 17 00:00:00 2001 From: sakno Date: Mon, 15 Apr 2024 21:49:14 +0300 Subject: [PATCH] Fixed detection of ephemeral entry --- .../Raft/PersistentState.Partition.cs | 63 ++++++++++++++++--- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs index e2d14acd7..c65e22106 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs @@ -412,6 +412,19 @@ private sealed class Table : Partition, IReadOnlyList> { private const int HeaderSize = 512; + private static readonly ReadOnlyMemory EmptyMetadata; + private static readonly ReadOnlyMemory EphemeralMetadata; + + static Table() + { + EmptyMetadata = new byte[LogEntryMetadata.Size]; // all zeroes + + var ephemeral = LogEntryMetadata.Create(LogEntry.Initial, HeaderSize + LogEntryMetadata.Size, length: 0L); + var buffer = new byte[LogEntryMetadata.Size]; + ephemeral.Format(buffer); + EphemeralMetadata = buffer; + } + // metadata management private MemoryOwner header, footer, metadataBuffer; private ReadOnlyMemory payloadBuffer; @@ -429,8 +442,7 @@ internal Table(DirectoryInfo location, int bufferSize, int recordsPerPartition, // init ephemeral 0 entry if (PartitionNumber is 0L) { - var metadata = LogEntryMetadata.Create(LogEntry.Initial, HeaderSize + LogEntryMetadata.Size, length: 0L); - metadata.Format(footer.Span); + EphemeralMetadata.CopyTo(footer.Memory); } } @@ -458,17 +470,28 @@ internal override void Initialize() else { // read sequentially every log entry - var metadataBuffer = this.metadataBuffer.Span; - var metadataTable = footer.Span; - int footerOffset = 0; - for (long fileOffset = HeaderSize; ; footerOffset += LogEntryMetadata.Size) + int footerOffset; + long fileOffset; + + if (PartitionNumber is 0L) + { + footerOffset = LogEntryMetadata.Size; + fileOffset = HeaderSize + LogEntryMetadata.Size; + } + else + { + footerOffset = 0; + fileOffset = HeaderSize; + } + + for (Span metadataBuffer = this.metadataBuffer.Span, metadataTable = footer.Span; ; footerOffset += LogEntryMetadata.Size) { var count = RandomAccess.Read(Handle, metadataBuffer, fileOffset); if (count < LogEntryMetadata.Size) break; fileOffset = LogEntryMetadata.GetEndOfLogEntry(metadataBuffer); - if (fileOffset is 0L) + if (fileOffset <= 0L) break; metadataBuffer.CopyTo(metadataTable.Slice(footerOffset, LogEntryMetadata.Size)); @@ -571,7 +594,7 @@ private async ValueTask FlushAndSealAsync(CancellationToken token) private async ValueTask FlushAndUnsealAsync(CancellationToken token) { - await writer.WriteAsync(token).ConfigureAwait(false); + await FlushAndEraseNextEntryAsync(token).ConfigureAwait(false); IsSealed = false; await WriteHeaderAsync(token).ConfigureAwait(false); @@ -579,6 +602,30 @@ private async ValueTask FlushAndUnsealAsync(CancellationToken token) writer.FlushToDisk(); } + private ValueTask FlushAndEraseNextEntryAsync(CancellationToken token) + { + ValueTask task; + + // write the rest of the entry, + // then cleanup next entry header to indicate that the current entry is the last entry + if (!writer.HasBufferedData) + { + task = RandomAccess.WriteAsync(Handle, EmptyMetadata, writer.FilePosition, token); + } + else if (writer.Buffer is { Length: >= LogEntryMetadata.Size } emptyMetadataStub) + { + emptyMetadataStub.Span.Slice(0, LogEntryMetadata.Size).Clear(); + writer.Produce(LogEntryMetadata.Size); + task = writer.WriteAsync(token); + } + else + { + task = writer.WriteAsync(EmptyMetadata, token); + } + + return task; + } + private ValueTask WriteHeaderAsync(CancellationToken token) => RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token);