diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs index 279ad6523..b2c9bfe22 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs @@ -151,7 +151,7 @@ internal static long GetTerm(ReadOnlySpan input) internal static long GetOffset(ReadOnlySpan input) => BinaryPrimitives.ReadInt64LittleEndian(input.Slice(0, sizeof(long) + sizeof(long) + sizeof(long))); // skip Term, Timestamp, Length - private long End => Length + Offset; + internal long End => Length + Offset; internal static long GetEndOfLogEntry(ReadOnlySpan input) { diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs index b4e4f5278..805dba26b 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs @@ -1,4 +1,5 @@ -using System.Diagnostics; +using System.Collections; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -242,12 +243,12 @@ protected override void Dispose(bool disposing) } } - private sealed class SparsePartition : Partition + private sealed class SparsePartition : Partition, IReadOnlyList> { private readonly long maxLogEntrySize; private MemoryOwner metadataTable; private MemoryOwner metadataBuffer; - private ReadOnlyMemory[]? bufferList; + private ReadOnlyMemory payloadBuffer; internal SparsePartition(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize, long maxLogEntrySize) : base(location, offset: 0, bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize, FileAttributes.NotContentIndexed | FileAttributes.SparseFile) @@ -284,6 +285,7 @@ protected override LogEntryMetadata GetMetadata(int index) protected override void OnCached(in CachedLogEntry cachedEntry, int index) => metadataTable[index].Metadata = LogEntryMetadata.Create(cachedEntry, GetMetadataOffset(index) + LogEntryMetadata.Size); + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] protected override async ValueTask PersistAsync(TEntry entry, int index, CancellationToken token) { var metadataOffset = GetMetadataOffset(index); @@ -326,6 +328,7 @@ protected override async ValueTask PersistAsync(TEntry entry, int index, throw new InvalidOperationException(ExceptionMessages.LogEntryPayloadTooLarge); } + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token) { if (entry.Length > maxLogEntrySize) @@ -334,17 +337,32 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i var metadata = LogEntryMetadata.Create(entry, GetMetadataOffset(index) + LogEntryMetadata.Size); metadata.Format(metadataBuffer.Span); - if (bufferList is null) - { - bufferList = new ReadOnlyMemory[2]; - bufferList[0] = metadataBuffer.Memory; - } - - bufferList[1] = entry.Content.Memory; - await RandomAccess.WriteAsync(Handle, bufferList, GetMetadataOffset(index), token).ConfigureAwait(false); + payloadBuffer = entry.Content.Memory; + await RandomAccess.WriteAsync(Handle, this, GetMetadataOffset(index), token).ConfigureAwait(false); + payloadBuffer = default; metadataTable[index].Metadata = metadata; } + ReadOnlyMemory IReadOnlyList>.this[int index] => index switch + { + 0 => metadataBuffer.Memory, + 1 => payloadBuffer, + _ => throw new ArgumentOutOfRangeException(nameof(index)), + }; + + int IReadOnlyCollection>.Count => 2; + + private IEnumerator> GetEnumerator() + { + yield return metadataBuffer.Memory; + yield return payloadBuffer; + } + + IEnumerator> IEnumerable>.GetEnumerator() + => GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + protected override void Dispose(bool disposing) { metadataTable.Dispose(); @@ -376,124 +394,208 @@ internal LogEntryMetadata Metadata Partition file format: FileName - number of partition Payload: - [struct LogEntryMetadata] X Capacity - prologue with metadata - [octet string] X number of entries + [512 bytes] - header: + [1 byte] - true if completed partition + [struct LogEntryMetadata] [octet string] X Capacity - log entries prefixed with metadata + [struct LogEntryMetadata] X Capacity - a table of log entries within the file, if partition is completed */ - private sealed class Table : Partition + private sealed class Table : Partition, IReadOnlyList> { - // metadata management - private MemoryOwner metadata; - private int metadataFlushStartAddress; - private int metadataFlushEndAddress; + private const int HeaderSize = 512; - // represents offset within the file from which a newly added log entry payload can be recorded - private long writeAddress; + // metadata management + private MemoryOwner header, footer, metadataBuffer; + private ReadOnlyMemory payloadBuffer; internal Table(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize) - : base(location, checked(LogEntryMetadata.Size * recordsPerPartition), bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize) + : base(location, HeaderSize, bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize) { - // allocate metadata segment - metadata = manager.BufferAllocator.AllocateExactly(fileOffset); - metadataFlushStartAddress = int.MaxValue; + footer = manager.BufferAllocator.AllocateExactly(recordsPerPartition * LogEntryMetadata.Size); + + header = manager.BufferAllocator.AllocateExactly(HeaderSize); + header.Span.Clear(); - writeAddress = fileOffset; + metadataBuffer = manager.BufferAllocator.AllocateExactly(LogEntryMetadata.Size); + + // init ephemeral 0 entry + if (PartitionNumber is 0L) + { + var metadata = LogEntryMetadata.Create(LogEntry.Initial, HeaderSize + LogEntryMetadata.Size, length: 0L); + metadata.Format(footer.Span); + } + } + + private bool IsSealed + { + get => Unsafe.BitCast(MemoryMarshal.GetReference(header.Span)); + set => MemoryMarshal.GetReference(header.Span) = Unsafe.BitCast(value); } internal override void Initialize() { using var handle = File.OpenHandle(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, FileOptions.SequentialScan); - if (RandomAccess.Read(handle, metadata.Span, 0L) < fileOffset) + + // read header + if (RandomAccess.Read(Handle, header.Span, fileOffset: 0L) < HeaderSize) { - metadata.Span.Clear(); - RandomAccess.Write(handle, metadata.Span, 0L); + header.Span.Clear(); } - else + else if (IsSealed) { - writeAddress = Math.Max(fileOffset, GetWriteAddress(metadata.Span)); + // partition is completed, read table + var tableStart = RandomAccess.GetLength(Handle); + RandomAccess.Read(Handle, footer.Span, tableStart - footer.Length); } - - static long GetWriteAddress(ReadOnlySpan metadataTable) + else { - long result; - - for (result = 0L; !metadataTable.IsEmpty; metadataTable = metadataTable.Slice(LogEntryMetadata.Size)) + // read sequentially every log entry + var metadataBuffer = this.metadataBuffer.Span; + var metadataTable = footer.Span; + int footerOffset = 0; + for (long fileOffset = HeaderSize; ; footerOffset += LogEntryMetadata.Size) { - result = Math.Max(result, LogEntryMetadata.GetEndOfLogEntry(metadataTable)); - } + var count = RandomAccess.Read(Handle, metadataBuffer, fileOffset); + if (count < LogEntryMetadata.Size) + break; + + fileOffset = LogEntryMetadata.GetEndOfLogEntry(metadataBuffer); + if (fileOffset is 0L) + break; - return result; + metadataBuffer.CopyTo(metadataTable.Slice(footerOffset, LogEntryMetadata.Size)); + } } } - private async ValueTask FlushAsync(ReadOnlyMemory metadata, CancellationToken token) - { - await RandomAccess.WriteAsync(Handle, metadata, metadataFlushStartAddress, token).ConfigureAwait(false); - metadataFlushStartAddress = int.MaxValue; - metadataFlushEndAddress = 0; + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private Span GetMetadataSpan(int index) + => footer.Span.Slice(index * LogEntryMetadata.Size, LogEntryMetadata.Size); - await base.FlushAsync(token).ConfigureAwait(false); - } + protected override LogEntryMetadata GetMetadata(int index) + => new(GetMetadataSpan(index)); - public override ValueTask FlushAsync(CancellationToken token = default) - { - var size = metadataFlushEndAddress - metadataFlushStartAddress; - return size > 0 - ? FlushAsync(metadata.Memory.Slice(metadataFlushStartAddress, size), token) - : base.FlushAsync(token); - } + private long GetWriteAddress(int index) + => index is 0 ? fileOffset : LogEntryMetadata.GetEndOfLogEntry(GetMetadataSpan(index - 1)); - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private Span GetMetadata(int index, out int offset) + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + protected override async ValueTask PersistAsync(TEntry entry, int index, CancellationToken token) { - Debug.Assert(metadata.Length == fileOffset); + var writeAddress = GetWriteAddress(index); - return metadata.Span.Slice(offset = index * LogEntryMetadata.Size); - } + LogEntryMetadata metadata; + var startPos = writeAddress + LogEntryMetadata.Size; + if (entry.Length is { } length) + { + // fast path - write metadata and entry sequentially + metadata = LogEntryMetadata.Create(entry, startPos, length); - protected override LogEntryMetadata GetMetadata(int index) - => new(GetMetadata(index, out _)); + await SetWritePositionAsync(writeAddress, token).ConfigureAwait(false); + await writer.WriteAsync(metadata, token).ConfigureAwait(false); + await entry.WriteToAsync(writer, token).ConfigureAwait(false); + } + else + { + // slow path - write entry first + await SetWritePositionAsync(startPos, token).ConfigureAwait(false); - private void WriteMetadata(int index, in LogEntryMetadata metadata) - { - metadata.Format(GetMetadata(index, out var offset)); + await entry.WriteToAsync(writer, token).ConfigureAwait(false); + length = writer.WritePosition - startPos; - metadataFlushStartAddress = Math.Min(metadataFlushStartAddress, offset); - metadataFlushEndAddress = Math.Max(metadataFlushEndAddress, offset + LogEntryMetadata.Size); - } + metadata = LogEntryMetadata.Create(entry, startPos, length); + metadata.Format(metadataBuffer.Span); + await RandomAccess.WriteAsync(Handle, metadataBuffer.Memory, writeAddress, token).ConfigureAwait(false); + } - protected override async ValueTask PersistAsync(TEntry entry, int index, CancellationToken token) - { - // slow path - persist log entry - await SetWritePositionAsync(writeAddress, token).ConfigureAwait(false); - await entry.WriteToAsync(writer, token).ConfigureAwait(false); + metadata.Format(GetMetadataSpan(index)); + + if (index == LastIndex) + { + // write footer with metadata table + await RandomAccess.WriteAsync(Handle, footer.Memory, metadata.End, token).ConfigureAwait(false); + + // seal the partition + IsSealed = true; + } + else if (IsSealed) + { + // unseal + IsSealed = false; + } + else + { + return; + } - // save new log entry to the allocation table - var length = writer.WritePosition - writeAddress; - WriteMetadata(index, LogEntryMetadata.Create(entry, writeAddress, length)); - writeAddress += length; + await RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token).ConfigureAwait(false); } + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token) { - await SetWritePositionAsync(writeAddress, token).ConfigureAwait(false); - Debug.Assert(writer.HasBufferedData is false); + var writeAddress = GetWriteAddress(index); + var startPos = writeAddress + LogEntryMetadata.Size; + var metadata = LogEntryMetadata.Create(entry, startPos, entry.Length); + metadata.Format(metadataBuffer.Span); + + payloadBuffer = entry.Content.Memory; + await RandomAccess.WriteAsync(Handle, this, writeAddress, token).ConfigureAwait(false); + payloadBuffer = default; - await RandomAccess.WriteAsync(Handle, entry.Content.Memory, writeAddress, token).ConfigureAwait(false); + metadata.Format(GetMetadataSpan(index)); - // save new log entry to the allocation table - WriteMetadata(index, LogEntryMetadata.Create(entry, writeAddress, entry.Length)); - writer.FilePosition = writeAddress += entry.Length; + if (index == LastIndex) + { + // write footer with metadata table + await RandomAccess.WriteAsync(Handle, footer.Memory, metadata.End, token).ConfigureAwait(false); + + // seal the partition + IsSealed = true; + } + else if (IsSealed) + { + // unseal + IsSealed = false; + } + else + { + return; + } + + await RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token).ConfigureAwait(false); } protected override void OnCached(in CachedLogEntry cachedEntry, int index) { - WriteMetadata(index, LogEntryMetadata.Create(cachedEntry, writeAddress, cachedEntry.Length)); - writeAddress += cachedEntry.Length; + var startPos = GetWriteAddress(index) + LogEntryMetadata.Size; + var metadata = LogEntryMetadata.Create(in cachedEntry, startPos); + metadata.Format(GetMetadataSpan(index)); + } + + ReadOnlyMemory IReadOnlyList>.this[int index] => index switch + { + 0 => metadataBuffer.Memory, + 1 => payloadBuffer, + _ => throw new ArgumentOutOfRangeException(nameof(index)), + }; + + int IReadOnlyCollection>.Count => 2; + + private IEnumerator> GetEnumerator() + { + yield return metadataBuffer.Memory; + yield return payloadBuffer; } + IEnumerator> IEnumerable>.GetEnumerator() + => GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + protected override void Dispose(bool disposing) { - metadata.Dispose(); + header.Dispose(); + footer.Dispose(); + metadataBuffer.Dispose(); base.Dispose(disposing); } }