diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7726cadb9..d005463d5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,17 @@
Release Notes
====
+# 04-20-2024
+DotNext.IO 5.4.0
+* Added `FileWriter.WrittenBuffer` property
+
+DotNext.Net.Cluster 5.4.0
+* Changed binary file format for WAL for more efficient I/O. A new format is incompatible with all previous versions. To enable legacy format, set `PersistentState.Options.UseLegacyBinaryFormat` property to **true**
+* Introduced a new experimental binary format for WAL based on sparse files. Can be enabled with `PersistentState.Options.MaxLogEntrySize` property
+
+DotNext.AspNetCore.Cluster 5.4.0
+* Updated dependencies
+
# 03-20-2024
DotNext 5.3.1
* Provided support of thread-local storage for `StreamSource.AsSharedStream`
diff --git a/README.md b/README.md
index 7df385b5b..be773a311 100644
--- a/README.md
+++ b/README.md
@@ -44,11 +44,17 @@ All these things are implemented in 100% managed code on top of existing .NET AP
* [NuGet Packages](https://www.nuget.org/profiles/rvsakno)
# What's new
-Release Date: 03-20-2024
+Release Date: 04-20-2024
-DotNext 5.3.1
-* Provided support of thread-local storage for `StreamSource.AsSharedStream`
-* Remove type cast for `Func.Constant` static method
+DotNext.IO 5.4.0
+* Added `FileWriter.WrittenBuffer` property
+
+DotNext.Net.Cluster 5.4.0
+* Changed binary file format for WAL for more efficient I/O. A new format is incompatible with all previous versions. To enable legacy format, set `PersistentState.Options.UseLegacyBinaryFormat` property to **true**
+* Introduced a new experimental binary format for WAL based on sparse files. Can be enabled with `PersistentState.Options.MaxLogEntrySize` property
+
+DotNext.AspNetCore.Cluster 5.4.0
+* Updated dependencies
Changelog for previous versions located [here](./CHANGELOG.md).
diff --git a/src/DotNext.IO/DotNext.IO.csproj b/src/DotNext.IO/DotNext.IO.csproj
index c659aa49a..a51ced958 100644
--- a/src/DotNext.IO/DotNext.IO.csproj
+++ b/src/DotNext.IO/DotNext.IO.csproj
@@ -11,7 +11,7 @@
.NET Foundation and Contributors
.NEXT Family of Libraries
- 5.3.0
+ 5.4.0
DotNext.IO
MIT
diff --git a/src/DotNext.IO/IO/FileWriter.Utils.cs b/src/DotNext.IO/IO/FileWriter.Utils.cs
index ad8bfbf8a..47eece1ce 100644
--- a/src/DotNext.IO/IO/FileWriter.Utils.cs
+++ b/src/DotNext.IO/IO/FileWriter.Utils.cs
@@ -17,14 +17,14 @@ public partial class FileWriter : IDynamicInterfaceCastable
private ReadOnlyMemory GetBuffer(int index) => index switch
{
- 0 => WrittenMemory,
+ 0 => WrittenBuffer,
1 => secondBuffer,
_ => ReadOnlyMemory.Empty,
};
private IEnumerator> EnumerateBuffers()
{
- yield return WrittenMemory;
+ yield return WrittenBuffer;
yield return secondBuffer;
}
diff --git a/src/DotNext.IO/IO/FileWriter.cs b/src/DotNext.IO/IO/FileWriter.cs
index be323ed48..4de20e4e4 100644
--- a/src/DotNext.IO/IO/FileWriter.cs
+++ b/src/DotNext.IO/IO/FileWriter.cs
@@ -67,7 +67,10 @@ public FileWriter(FileStream destination, int bufferSize = 4096, MemoryAllocator
throw new ArgumentException(ExceptionMessages.StreamNotWritable, nameof(destination));
}
- private ReadOnlyMemory WrittenMemory => buffer.Memory.Slice(0, bufferOffset);
+ ///
+ /// Gets written part of the buffer.
+ ///
+ public ReadOnlyMemory WrittenBuffer => buffer.Memory.Slice(0, bufferOffset);
private int FreeCapacity => buffer.Length - bufferOffset;
@@ -138,11 +141,11 @@ public long FilePosition
public long WritePosition => fileOffset + bufferOffset;
private ValueTask FlushCoreAsync(CancellationToken token)
- => Submit(RandomAccess.WriteAsync(handle, WrittenMemory, fileOffset, token), writeCallback);
+ => Submit(RandomAccess.WriteAsync(handle, WrittenBuffer, fileOffset, token), writeCallback);
private void FlushCore()
{
- RandomAccess.Write(handle, WrittenMemory.Span, fileOffset);
+ RandomAccess.Write(handle, WrittenBuffer.Span, fileOffset);
fileOffset += bufferOffset;
bufferOffset = 0;
}
@@ -198,7 +201,7 @@ private void WriteSlow(ReadOnlySpan input)
{
if (input.Length >= buffer.Length)
{
- RandomAccess.Write(handle, WrittenMemory.Span, fileOffset);
+ RandomAccess.Write(handle, WrittenBuffer.Span, fileOffset);
fileOffset += bufferOffset;
RandomAccess.Write(handle, input, fileOffset);
@@ -207,7 +210,7 @@ private void WriteSlow(ReadOnlySpan input)
}
else
{
- RandomAccess.Write(handle, WrittenMemory.Span, fileOffset);
+ RandomAccess.Write(handle, WrittenBuffer.Span, fileOffset);
fileOffset += bufferOffset;
input.CopyTo(buffer.Span);
bufferOffset += input.Length;
@@ -271,7 +274,7 @@ private ValueTask WriteAndCopyAsync(ReadOnlyMemory input, CancellationToke
Debug.Assert(HasBufferedData);
secondBuffer = input;
- return Submit(RandomAccess.WriteAsync(handle, WrittenMemory, fileOffset, token), writeAndCopyCallback);
+ return Submit(RandomAccess.WriteAsync(handle, WrittenBuffer, fileOffset, token), writeAndCopyCallback);
}
///
diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Http/RaftHttpClusterTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Http/RaftHttpClusterTests.cs
index 480742cc9..4e37ff3c4 100644
--- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Http/RaftHttpClusterTests.cs
+++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Http/RaftHttpClusterTests.cs
@@ -11,6 +11,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft.Http;
using Diagnostics;
using Messaging;
using Replication;
+using Threading;
using static DotNext.Extensions.Logging.TestLoggers;
[Collection(TestCollections.Raft)]
@@ -447,7 +448,6 @@ public static async Task StandbyMode()
await GetLocalClusterView(host3).Readiness.WaitAsync(DefaultTimeout);
// suspend two nodes
- False(await GetLocalClusterView(host1).EnableStandbyModeAsync());
True(await GetLocalClusterView(host2).EnableStandbyModeAsync());
True(GetLocalClusterView(host2).Standby);
True(await GetLocalClusterView(host3).EnableStandbyModeAsync());
@@ -775,4 +775,46 @@ public async Task RegressionIssue153()
static IFailureDetector CreateFailureDetector(TimeSpan estimate, IRaftClusterMember member)
=> new PhiAccrualFailureDetector(estimate) { Threshold = 3D, TreatUnknownValueAsUnhealthy = true };
}
+
+ [Fact]
+ public static async Task ConsensusToken()
+ {
+ var config1 = new Dictionary
+ {
+ {"publicEndPoint", "http://localhost:3262"},
+ {"coldStart", "true"},
+ // {"requestTimeout", "00:00:01"},
+ // {"rpcTimeout", "00:00:01"},
+ // {"lowerElectionTimeout", "6000" },
+ // {"upperElectionTimeout", "9000" },
+ };
+
+ var config2 = new Dictionary
+ {
+ {"publicEndPoint", "http://localhost:3263"},
+ {"coldStart", "false"},
+ {"standby", "true"},
+ };
+
+ using var host1 = CreateHost(3262, config1);
+ await host1.StartAsync();
+
+ using var host2 = CreateHost(3263, config2);
+ True(GetLocalClusterView(host2).ConsensusToken.IsCancellationRequested);
+ await host2.StartAsync();
+
+ await GetLocalClusterView(host1).WaitForLeaderAsync(DefaultTimeout);
+ Equal(GetLocalClusterView(host1).LeadershipToken, GetLocalClusterView(host1).ConsensusToken);
+ True(await GetLocalClusterView(host1).AddMemberAsync(GetLocalClusterView(host2).LocalMemberAddress));
+
+ await GetLocalClusterView(host2).Readiness.WaitAsync(DefaultTimeout);
+
+ var token = GetLocalClusterView(host2).ConsensusToken;
+ False(token.IsCancellationRequested);
+
+ await host1.StopAsync();
+
+ await token.WaitAsync();
+ await host2.StopAsync();
+ }
}
\ No newline at end of file
diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs
index a4368a21e..815c4d2fc 100644
--- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs
+++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs
@@ -199,12 +199,14 @@ public static async Task QueryAppendEntries(long partitionSize, bool caching, in
}
}
- [Fact]
- public static async Task ParallelReads()
+ [Theory]
+ [InlineData(null)]
+ [InlineData(1024L * 1024L * 100L)]
+ public static async Task ParallelReads(long? maxLogEntrySize)
{
var entry = new TestLogEntry("SET X = 0") { Term = 42L };
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
- IPersistentState state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { CopyOnReadOptions = new() });
+ IPersistentState state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { CopyOnReadOptions = new(), MaxLogEntrySize = maxLogEntrySize });
try
{
Equal(1L, await state.AppendAsync(new LogEntryList(entry)));
@@ -235,11 +237,13 @@ public static async Task ParallelReads()
}
}
- [Fact]
- public static async Task AppendWhileReading()
+ [Theory]
+ [InlineData(null)]
+ [InlineData(1024L * 1024L * 100L)]
+ public static async Task AppendWhileReading(long? maxLogEntrySize)
{
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
- using var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition);
+ using var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { MaxLogEntrySize = maxLogEntrySize });
var entry = new TestLogEntry("SET X = 0") { Term = 42L };
await state.AppendAsync(entry);
@@ -281,8 +285,48 @@ public static async Task DropRecords(bool reuseSpace)
Equal(0L, state.LastCommittedEntryIndex);
}
+ [Theory]
+ [InlineData(null)]
+ [InlineData(1024L * 1024L * 100L)]
+ public static async Task Overwrite(long? maxLogEntrySize)
+ {
+ var entry1 = new TestLogEntry("SET X = 0") { Term = 42L };
+ var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
+ var entry3 = new TestLogEntry("SET Z = 2") { Term = 44L };
+ var entry4 = new TestLogEntry("SET U = 3") { Term = 45L };
+ var entry5 = new TestLogEntry("SET V = 4") { Term = 46L };
+ Func, long?, CancellationToken, ValueTask> checker;
+ var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
+ using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { MaxLogEntrySize = maxLogEntrySize }))
+ {
+ Equal(1L, await state.AppendAsync(new LogEntryList(entry2, entry3, entry4, entry5)));
+ Equal(4L, state.LastEntryIndex);
+ Equal(0L, state.LastCommittedEntryIndex);
+ await state.AppendAsync(entry1, 1L);
+ Equal(1L, state.LastEntryIndex);
+ Equal(0L, state.LastCommittedEntryIndex);
+ }
+
+ //read again
+ using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { MaxLogEntrySize = maxLogEntrySize }))
+ {
+ Equal(1L, state.LastEntryIndex);
+ Equal(0L, state.LastCommittedEntryIndex);
+ checker = async (entries, snapshotIndex, token) =>
+ {
+ Null(snapshotIndex);
+ Single(entries);
+ False(entries[0].IsSnapshot);
+ Equal(entry1.Content, await entries[0].ToStringAsync(Encoding.UTF8));
+ return Missing.Value;
+ };
+ await state.As().ReadAsync(new LogEntryConsumer(checker), 1L, CancellationToken.None);
+ }
+ }
+
+ [Obsolete]
[Fact]
- public static async Task Overwrite()
+ public static async Task LegacyOverwrite()
{
var entry1 = new TestLogEntry("SET X = 0") { Term = 42L };
var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
@@ -291,7 +335,7 @@ public static async Task Overwrite()
var entry5 = new TestLogEntry("SET V = 4") { Term = 46L };
Func, long?, CancellationToken, ValueTask> checker;
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
- using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition))
+ using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { UseLegacyBinaryFormat = true }))
{
Equal(1L, await state.AppendAsync(new LogEntryList(entry2, entry3, entry4, entry5)));
Equal(4L, state.LastEntryIndex);
@@ -302,7 +346,7 @@ public static async Task Overwrite()
}
//read again
- using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition))
+ using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { UseLegacyBinaryFormat = true }))
{
Equal(1L, state.LastEntryIndex);
Equal(0L, state.LastCommittedEntryIndex);
@@ -833,6 +877,37 @@ public static async Task RestoreBackup()
}
}
+ [PlatformSpecificFact("linux")]
+ public static async Task CreateSparseBackup()
+ {
+ var entry1 = new TestLogEntry("SET X = 0") { Term = 42L };
+ var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
+ var entry3 = new TestLogEntry("SET Z = 2") { Term = 44L };
+ var entry4 = new TestLogEntry("SET U = 3") { Term = 45L };
+ var entry5 = new TestLogEntry("SET V = 4") { Term = 46L };
+ var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
+ var backupFile = Path.GetTempFileName();
+ IPersistentState state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { MaxLogEntrySize = 1024 * 1024, BackupFormat = System.Formats.Tar.TarEntryFormat.Gnu });
+ var member = ClusterMemberId.FromEndPoint(new IPEndPoint(IPAddress.IPv6Loopback, 3232));
+ try
+ {
+ //define node state
+ Equal(1, await state.IncrementTermAsync(member));
+ True(state.IsVotedFor(member));
+ //define log entries
+ Equal(1L, await state.AppendAsync(new LogEntryList(entry1, entry2, entry3, entry4, entry5)));
+ //commit some of them
+ Equal(2L, await state.CommitAsync(2L));
+ //save backup
+ await using var backupStream = new FileStream(backupFile, FileMode.Truncate, FileAccess.Write, FileShare.None, 1024, true);
+ await state.CreateBackupAsync(backupStream);
+ }
+ finally
+ {
+ (state as IDisposable)?.Dispose();
+ }
+ }
+
[Fact]
public static async Task Reconstruction()
{
diff --git a/src/DotNext/Disposable.cs b/src/DotNext/Disposable.cs
index e39121f55..2e0176a0f 100644
--- a/src/DotNext/Disposable.cs
+++ b/src/DotNext/Disposable.cs
@@ -4,8 +4,6 @@
namespace DotNext;
-using static Runtime.Intrinsics;
-
///
/// Provides implementation of dispose pattern.
///
diff --git a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj
index c917e5a68..a07e33b27 100644
--- a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj
+++ b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj
@@ -8,7 +8,7 @@
true
true
nullablePublicOnly
- 5.3.0
+ 5.4.0
.NET Foundation and Contributors
.NEXT Family of Libraries
diff --git a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj
index 851ebb76a..a940a37de 100644
--- a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj
+++ b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj
@@ -8,7 +8,7 @@
enable
true
nullablePublicOnly
- 5.3.0
+ 5.4.0
.NET Foundation and Contributors
.NEXT Family of Libraries
diff --git a/src/cluster/DotNext.Net.Cluster/ExceptionMessages.cs b/src/cluster/DotNext.Net.Cluster/ExceptionMessages.cs
index dc689d0d9..18bdec054 100644
--- a/src/cluster/DotNext.Net.Cluster/ExceptionMessages.cs
+++ b/src/cluster/DotNext.Net.Cluster/ExceptionMessages.cs
@@ -52,4 +52,8 @@ internal static string UnknownRaftMessageType(T messageType)
internal static string PersistentStateBroken => (string)Resources.Get();
internal static string ConcurrentMembershipUpdate => (string)Resources.Get();
+
+ internal static string LogEntryPayloadTooLarge => (string)Resources.Get();
+
+ internal static string SparseFileNotSupported => (string)Resources.Get();
}
\ No newline at end of file
diff --git a/src/cluster/DotNext.Net.Cluster/ExceptionMessages.restext b/src/cluster/DotNext.Net.Cluster/ExceptionMessages.restext
index 68eb10c16..99cfb4bf0 100644
--- a/src/cluster/DotNext.Net.Cluster/ExceptionMessages.restext
+++ b/src/cluster/DotNext.Net.Cluster/ExceptionMessages.restext
@@ -17,4 +17,6 @@ MissingMessageName=Message must have associated type and name
LeaderIsUnavailable=Leader node is not yet elected
UnknownRaftMessageType=Unknown Raft message type {0}
PersistentStateBroken=Internal state of the WAL didn't pass the integrity check
-ConcurrentMembershipUpdate=Cluster membership cannot be modified concurrently
\ No newline at end of file
+ConcurrentMembershipUpdate=Cluster membership cannot be modified concurrently
+LogEntryPayloadTooLarge=The size of the log entry is larger than specified threshold
+SparseFileNotSupported=TAR archive doesn't support sparse files on the target platform
\ No newline at end of file
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusTrackerState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusTrackerState.cs
new file mode 100644
index 000000000..59715f8b7
--- /dev/null
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusTrackerState.cs
@@ -0,0 +1,59 @@
+using System.Diagnostics.Metrics;
+using System.Runtime.InteropServices;
+
+namespace DotNext.Net.Cluster.Consensus.Raft;
+
+using Threading;
+
+internal abstract class ConsensusTrackerState(IRaftStateMachine stateMachine) : TokenizedState(stateMachine)
+ where TMember : class, IRaftClusterMember
+{
+ protected readonly AsyncAutoResetEvent refreshEvent = new(initialState: false) { MeasurementTags = stateMachine.MeasurementTags };
+ protected readonly AsyncManualResetEvent suppressionEvent = new(initialState: true) { MeasurementTags = stateMachine.MeasurementTags };
+
+ public virtual void Refresh()
+ {
+ Logger.TimeoutReset();
+ ConsensusTrackerState.HeartbeatRateMeter.Add(1, MeasurementTags);
+ }
+
+ public bool IsRefreshRequested => refreshEvent.IsSet;
+
+ private void SuspendTracking()
+ {
+ suppressionEvent.Reset();
+ refreshEvent.Set();
+ }
+
+ private void ResumeTracking() => suppressionEvent.Set();
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ refreshEvent.Dispose();
+ suppressionEvent.Dispose();
+ }
+
+ base.Dispose(disposing);
+ }
+
+ [StructLayout(LayoutKind.Auto)]
+ internal readonly struct TransitionSuppressionScope : IDisposable
+ {
+ private readonly ConsensusTrackerState? state;
+
+ internal TransitionSuppressionScope(ConsensusTrackerState? state)
+ {
+ state?.SuspendTracking();
+ this.state = state;
+ }
+
+ public void Dispose() => state?.ResumeTracking();
+ }
+}
+
+file static class ConsensusTrackerState
+{
+ internal static readonly Counter HeartbeatRateMeter = Metrics.Instrumentation.ServerSide.CreateCounter("incoming-heartbeats-count", description: "Incoming Heartbeats from Leader");
+}
\ No newline at end of file
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs
index 6e2195874..a02be4e62 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs
@@ -66,14 +66,14 @@ protected DiskBasedStateMachine(string path, int recordsPerPartition, Options? c
{
if (TryGetPartition(startIndex, ref partition))
{
- var entry = partition.Read(sessionId, startIndex, out var persisted);
+ var entry = partition.Read(sessionId, startIndex);
var snapshotLength = await ApplyCoreAsync(entry).ConfigureAwait(false);
Volatile.Write(ref lastTerm, entry.Term);
// Remove log entry from the cache according to eviction policy
- if (!persisted)
+ if (!entry.IsPersisted)
{
- await partition.PersistCachedEntryAsync(startIndex, entry.Position, snapshotLength.HasValue).ConfigureAwait(false);
+ await partition.PersistCachedEntryAsync(startIndex, snapshotLength.HasValue).ConfigureAwait(false);
// Flush partition if we are finished or at the last entry in it
if (startIndex == commitIndex || startIndex == partition.LastIndex)
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs
index eea246a75..a9ee9a682 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs
@@ -1,45 +1,31 @@
using System.Diagnostics.Metrics;
-using System.Runtime.InteropServices;
namespace DotNext.Net.Cluster.Consensus.Raft;
-using Threading;
-
-internal sealed class FollowerState : RaftState
+internal sealed class FollowerState : ConsensusTrackerState
where TMember : class, IRaftClusterMember
{
- private readonly AsyncAutoResetEvent refreshEvent;
- private readonly AsyncManualResetEvent suppressionEvent;
private readonly CancellationTokenSource trackerCancellation;
- private readonly CancellationToken trackerCancellationToken; // cached to prevent ObjectDisposedException
private Task? tracker;
private volatile bool timedOut;
public FollowerState(IRaftStateMachine stateMachine)
: base(stateMachine)
{
- refreshEvent = new(initialState: false) { MeasurementTags = stateMachine.MeasurementTags };
- suppressionEvent = new(initialState: true) { MeasurementTags = stateMachine.MeasurementTags };
trackerCancellation = new();
- trackerCancellationToken = trackerCancellation.Token;
+ Token = trackerCancellation.Token;
}
- private void SuspendTracking()
- {
- suppressionEvent.Reset();
- refreshEvent.Set();
- }
-
- private void ResumeTracking() => suppressionEvent.Set();
+ public override CancellationToken Token { get; } // cached to prevent ObjectDisposedException
private async Task Track(TimeSpan timeout)
{
// spin loop to wait for the timeout
- while (await refreshEvent.WaitAsync(timeout, trackerCancellationToken).ConfigureAwait(false))
+ while (await refreshEvent.WaitAsync(timeout, Token).ConfigureAwait(false))
{
// Transition can be suppressed. If so, resume the loop and reset the timer.
// If the event is in signaled state then the returned task is completed synchronously.
- await suppressionEvent.WaitAsync(trackerCancellationToken).ConfigureAwait(false);
+ await suppressionEvent.WaitAsync(Token).ConfigureAwait(false);
}
timedOut = true;
@@ -66,13 +52,10 @@ internal void StartServing(TimeSpan timeout)
internal bool IsExpired => timedOut;
- internal bool IsRefreshRequested => refreshEvent.IsSet;
-
- internal void Refresh()
+ public override void Refresh()
{
- Logger.TimeoutReset();
refreshEvent.Set();
- FollowerState.HeartbeatRateMeter.Add(1, MeasurementTags);
+ base.Refresh();
}
protected override async ValueTask DisposeAsyncCore()
@@ -96,32 +79,15 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
- refreshEvent.Dispose();
- suppressionEvent.Dispose();
trackerCancellation.Dispose();
tracker = null;
}
base.Dispose(disposing);
}
-
- [StructLayout(LayoutKind.Auto)]
- internal readonly struct TransitionSuppressionScope : IDisposable
- {
- private readonly FollowerState? state;
-
- internal TransitionSuppressionScope(FollowerState? state)
- {
- state?.SuspendTracking();
- this.state = state;
- }
-
- public void Dispose() => state?.ResumeTracking();
- }
}
-internal static class FollowerState
+file static class FollowerState
{
internal static readonly Counter TransitionRateMeter = Metrics.Instrumentation.ServerSide.CreateCounter("transitions-to-follower-count", description: "Number of Transitions to Follower State");
- internal static readonly Counter HeartbeatRateMeter = Metrics.Instrumentation.ServerSide.CreateCounter("incoming-heartbeats-count", description: "Incoming Heartbeats from Leader");
}
\ No newline at end of file
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IRaftCluster.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IRaftCluster.cs
index d681b1744..8ffe3ae28 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IRaftCluster.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IRaftCluster.cs
@@ -48,6 +48,15 @@ public interface IRaftCluster : IReplicationCluster, IPeerMesh
CancellationToken LeadershipToken { get; }
+ ///
+ /// Gets a token that remains non-canceled while the local node is a part of the majority of the cluster and
+ /// has communication with the leader.
+ ///
+ ///
+ /// The token moves to canceled state if the current node upgrades to the candidate state or looses connection with the leader.
+ ///
+ CancellationToken ConsensusToken { get; }
+
///
/// Represents a task indicating that the current node is ready to serve requests.
///
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IRaftStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IRaftStateMachine.cs
index 276b33d97..46a251028 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IRaftStateMachine.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IRaftStateMachine.cs
@@ -35,4 +35,6 @@ internal interface IRaftStateMachine : IRaftStateMachine
void MoveToLeaderState(IWeakCallerStateIdentity callerState, TMember leader);
void UnavailableMemberDetected(IWeakCallerStateIdentity callerState, TMember member, CancellationToken token);
+
+ void IncomingHeartbeatTimedOut(IWeakCallerStateIdentity callerState);
}
\ No newline at end of file
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs
index f14fe1567..bb5e33bb0 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs
@@ -1,6 +1,5 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
-using System.Runtime;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
@@ -13,12 +12,11 @@ namespace DotNext.Net.Cluster.Consensus.Raft;
using Threading.Tasks;
using GCLatencyModeScope = Runtime.GCLatencyModeScope;
-internal sealed partial class LeaderState : RaftState
+internal sealed partial class LeaderState : TokenizedState
where TMember : class, IRaftClusterMember
{
private readonly long currentTerm;
private readonly CancellationTokenSource timerCancellation;
- internal readonly CancellationToken LeadershipToken; // cached to prevent ObjectDisposedException
private readonly Task> localMemberResponse;
private Task? heartbeatTask;
@@ -29,7 +27,7 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa
currentTerm = term;
localMemberResponse = Task.FromResult(new Result { Term = term, Value = true });
timerCancellation = new();
- LeadershipToken = timerCancellation.Token;
+ Token = timerCancellation.Token;
this.maxLease = maxLease;
lease = new();
replicationEvent = new(initialState: false) { MeasurementTags = stateMachine.MeasurementTags };
@@ -38,6 +36,8 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa
replicatorFactory = localReplicatorFactory = CreateDefaultReplicator;
}
+ public override CancellationToken Token { get; } // cached to prevent ObjectDisposedException
+
private (long, long, int) ForkHeartbeats(TaskCompletionPipe>> responsePipe, IAuditTrail auditTrail, IClusterConfigurationStorage configurationStorage, IEnumerator members)
{
Replicator replicator;
@@ -48,7 +48,7 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa
var majority = 0;
// send heartbeat in parallel
- for (IClusterConfiguration? activeConfig = configurationStorage.ActiveConfiguration, proposedConfig = configurationStorage.ProposedConfiguration; members.MoveNext() && !LeadershipToken.IsCancellationRequested; responsePipe.Add(response, replicator), majority++)
+ for (IClusterConfiguration? activeConfig = configurationStorage.ActiveConfiguration, proposedConfig = configurationStorage.ProposedConfiguration; members.MoveNext() && !Token.IsCancellationRequested; responsePipe.Add(response, replicator), majority++)
{
var member = members.Current;
if (member.IsRemote)
@@ -56,13 +56,13 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa
var precedingIndex = member.State.PrecedingIndex;
// fork replication procedure
- replicator = context.GetOrCreate(member, replicatorFactory, LeadershipToken);
+ replicator = context.GetOrCreate(member, replicatorFactory, Token);
replicator.Initialize(activeConfig, proposedConfig, commitIndex, currentTerm, precedingIndex);
- response = SpawnReplicationAsync(replicator, auditTrail, currentIndex, LeadershipToken);
+ response = SpawnReplicationAsync(replicator, auditTrail, currentIndex, Token);
}
else
{
- replicator = context.GetOrCreate(member, localReplicatorFactory, LeadershipToken);
+ replicator = context.GetOrCreate(member, localReplicatorFactory, Token);
response = localMemberResponse;
}
}
@@ -116,7 +116,7 @@ private void CheckMemberHealthStatus(IFailureDetector? detector, TMember member)
Logger.UnknownHealthStatus(member.EndPoint);
break;
case { IsHealthy: false }:
- UnavailableMemberDetected(member, LeadershipToken);
+ UnavailableMemberDetected(member, Token);
break;
}
}
@@ -129,7 +129,7 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail audi
try
{
var forced = false;
- for (var responsePipe = new TaskCompletionPipe>>(); !LeadershipToken.IsCancellationRequested; responsePipe.Reset(), ReuseEnumerator(ref members, ref enumerator))
+ for (var responsePipe = new TaskCompletionPipe>>(); !Token.IsCancellationRequested; responsePipe.Reset(), ReuseEnumerator(ref members, ref enumerator))
{
var startTime = new Timestamp();
@@ -169,13 +169,13 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail audi
{
RenewLease(startTime.Elapsed);
UpdateLeaderStickiness();
- await configurationStorage.ApplyAsync(LeadershipToken).ConfigureAwait(false);
+ await configurationStorage.ApplyAsync(Token).ConfigureAwait(false);
}
if (result.Value && ++commitQuorum == majority)
{
// majority of nodes accept entries with at least one entry from the current term
- var count = await auditTrail.CommitAsync(currentIndex, LeadershipToken).ConfigureAwait(false); // commit all entries starting from the first uncommitted index to the end
+ var count = await auditTrail.CommitAsync(currentIndex, Token).ConfigureAwait(false); // commit all entries starting from the first uncommitted index to the end
Logger.CommitSuccessful(currentIndex, count);
}
}
@@ -203,7 +203,7 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail audi
replicationQueue.Drain();
// wait for heartbeat timeout or forced replication
- forced = await WaitForReplicationAsync(startTime, period, LeadershipToken).ConfigureAwait(false);
+ forced = await WaitForReplicationAsync(startTime, period, Token).ConfigureAwait(false);
}
}
finally
@@ -256,7 +256,7 @@ protected override async ValueTask DisposeAsyncCore()
try
{
timerCancellation.Cancel(throwOnFirstException: false);
- replicationEvent.CancelSuspendedCallers(LeadershipToken);
+ replicationEvent.CancelSuspendedCallers(Token);
await (heartbeatTask ?? Task.CompletedTask).ConfigureAwait(false); // may throw OperationCanceledException
}
catch (Exception e)
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LogMessages.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LogMessages.cs
index 530b5ed77..4d305b545 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LogMessages.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LogMessages.cs
@@ -340,4 +340,12 @@ internal static partial class LogMessages
EventName = $"{EventIdPrefix}.{nameof(StartedAsFrozen)}"
)]
public static partial void StartedAsFrozen(this ILogger logger);
+
+ [LoggerMessage(
+ EventIdOffset + 42,
+ LogLevel.Debug,
+ "Standby loop stopped with error",
+ EventName = $"{EventIdPrefix}.{nameof(StandbyStateExitedWithError)}"
+ )]
+ public static partial void StandbyStateExitedWithError(this ILogger logger, Exception e);
}
\ No newline at end of file
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs
index 49631e860..1eed8b47e 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs
@@ -681,14 +681,14 @@ private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationT
{
if (TryGetPartition(startIndex, ref partition))
{
- var entry = partition.Read(sessionId, startIndex, out var persisted);
+ var entry = partition.Read(sessionId, startIndex);
await ApplyCoreAsync(entry).ConfigureAwait(false);
Volatile.Write(ref lastTerm, entry.Term);
// Remove log entry from the cache according to eviction policy
- if (!persisted)
+ if (!entry.IsPersisted)
{
- await partition.PersistCachedEntryAsync(startIndex, entry.Position, evictOnCommit).ConfigureAwait(false);
+ await partition.PersistCachedEntryAsync(startIndex, evictOnCommit).ConfigureAwait(false);
// Flush partition if we are finished or at the last entry in it
if (startIndex == commitIndex || startIndex == partition.LastIndex)
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Backup.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Backup.cs
index e1680a636..6e5950737 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Backup.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Backup.cs
@@ -46,6 +46,12 @@ private static void ImportAttributes(SafeFileHandle handle, TarEntry entry)
File.SetUnixFileMode(handle, entry.Mode);
}
+
+ var attributes = FileAttributes.NotContentIndexed;
+ if (entry.EntryType is TarEntryType.SparseFile)
+ attributes |= FileAttributes.SparseFile;
+
+ File.SetAttributes(handle, attributes);
}
///
@@ -55,7 +61,64 @@ private static void ImportAttributes(SafeFileHandle handle, TarEntry entry)
/// The token that can be used to cancel the operation.
/// A task representing state of asynchronous execution.
/// The operation has been canceled.
- public async Task CreateBackupAsync(Stream output, CancellationToken token = default)
+ public Task CreateBackupAsync(Stream output, CancellationToken token = default)
+ => maxLogEntrySize > 0L ? CreateSparseBackupAsync(output, token) : CreateRegularBackupAsync(output, token);
+
+ private async Task CreateSparseBackupAsync(Stream output, CancellationToken token)
+ {
+ var tarProcess = new Process
+ {
+ StartInfo = new()
+ {
+ FileName = "tar",
+ WorkingDirectory = Location.FullName,
+ },
+ };
+
+ var outputArchive = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
+ tarProcess.StartInfo.ArgumentList.Add("cfS");
+ tarProcess.StartInfo.ArgumentList.Add(outputArchive);
+
+ FileStream? archiveStream = null;
+ await syncRoot.AcquireAsync(LockType.StrongReadLock, token).ConfigureAwait(false);
+ try
+ {
+ foreach (var file in Location.EnumerateFiles())
+ {
+ tarProcess.StartInfo.ArgumentList.Add(file.Name);
+ }
+
+ tarProcess.StartInfo.ArgumentList.Add($"--format={GetArchiveFormat(backupFormat)}");
+ tarProcess.Start();
+ await tarProcess.WaitForExitAsync(token).ConfigureAwait(false);
+
+ if (tarProcess.ExitCode is not 0)
+ throw new PlatformNotSupportedException(ExceptionMessages.SparseFileNotSupported) { HResult = tarProcess.ExitCode };
+
+ archiveStream = new(outputArchive, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, FileOptions.SequentialScan | FileOptions.Asynchronous | FileOptions.DeleteOnClose);
+ await archiveStream.CopyToAsync(output, token).ConfigureAwait(false);
+ await output.FlushAsync(token).ConfigureAwait(false);
+ }
+ finally
+ {
+ syncRoot.Release(LockType.StrongReadLock);
+ tarProcess.Dispose();
+
+ if (archiveStream is not null)
+ await archiveStream.DisposeAsync().ConfigureAwait(false);
+ }
+
+ static string GetArchiveFormat(TarEntryFormat format) => format switch
+ {
+ TarEntryFormat.Gnu => "gnu",
+ TarEntryFormat.Pax => "pax",
+ TarEntryFormat.Ustar => "ustar",
+ TarEntryFormat.V7 => "v7",
+ _ => "gnu",
+ };
+ }
+
+ private async Task CreateRegularBackupAsync(Stream output, CancellationToken token)
{
TarWriter? archive = null;
await syncRoot.AcquireAsync(LockType.StrongReadLock, token).ConfigureAwait(false);
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.BufferManagement.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.BufferManagement.cs
index 0e08a8027..5554cdaf7 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.BufferManagement.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.BufferManagement.cs
@@ -153,6 +153,8 @@ internal readonly struct BufferManager(IBufferManagerSettings options)
internal MemoryOwner AllocLogEntryCache(int recordsPerPartition)
=> cacheAllocator is null ? default : cacheAllocator(recordsPerPartition);
+
+ internal MemoryOwner Allocate(int length) where T : unmanaged => options.GetMemoryAllocator().AllocateExactly(length);
}
private readonly BufferingLogEntryConsumer? bufferingConsumer;
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs
index 489039e2c..b2c9bfe22 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs
@@ -148,7 +148,10 @@ public void Format(Span output)
internal static long GetTerm(ReadOnlySpan input)
=> BinaryPrimitives.ReadInt64LittleEndian(input);
- private long End => Length + Offset;
+ internal static long GetOffset(ReadOnlySpan input)
+ => BinaryPrimitives.ReadInt64LittleEndian(input.Slice(0, sizeof(long) + sizeof(long) + sizeof(long))); // skip Term, Timestamp, Length
+
+ internal long End => Length + Offset;
internal static long GetEndOfLogEntry(ReadOnlySpan input)
{
@@ -245,7 +248,7 @@ internal abstract class ConcurrentStorageAccess : Disposable
// This field is used to control 'freshness' of the read buffers
private ulong version; // volatile
- private protected ConcurrentStorageAccess(string fileName, int fileOffset, int bufferSize, MemoryAllocator allocator, int readersCount, WriteMode writeMode, long initialSize)
+ private protected ConcurrentStorageAccess(string fileName, int fileOffset, int bufferSize, MemoryAllocator allocator, int readersCount, WriteMode writeMode, long initialSize, FileAttributes attributes = FileAttributes.NotContentIndexed)
{
var options = writeMode is WriteMode.WriteThrough
? FileOptions.Asynchronous | FileOptions.WriteThrough | FileOptions.SequentialScan
@@ -267,7 +270,7 @@ private protected ConcurrentStorageAccess(string fileName, int fileOffset, int b
if (fileMode is FileMode.CreateNew)
{
- File.SetAttributes(Handle, FileAttributes.NotContentIndexed);
+ File.SetAttributes(Handle, attributes);
}
this.fileOffset = fileOffset;
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs
index ea96960d5..5b54e7c3b 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs
@@ -44,6 +44,12 @@ internal LogEntry(in SnapshotMetadata metadata)
index = -metadata.Index;
}
+ internal bool IsPersisted
+ {
+ get;
+ init;
+ }
+
internal IAsyncBinaryReader? ContentReader
{
init => content = metadata.Length > 0L ? value : IAsyncBinaryReader.Empty;
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Options.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Options.cs
index 8529105cb..8ded00808 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Options.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Options.cs
@@ -46,6 +46,7 @@ public class Options : IBufferManagerSettings
private int concurrencyLevel = Math.Max(3, Environment.ProcessorCount);
private long partitionSize;
private bool parallelIO;
+ internal long maxLogEntrySize;
///
/// Gets or sets a value indicating how the log interacts with underlying storage device.
@@ -132,6 +133,31 @@ public int MaxConcurrentReads
///
public TarEntryFormat BackupFormat { get; set; } = TarEntryFormat.Pax;
+ ///
+ /// Gets or sets maximum size of the log entry, in bytes.
+ ///
+ ///
+ /// If enabled, WAL uses sparse files to optimize performance.
+ /// method supports backup of sparse
+ /// files on Linux only.
+ /// method cannot restore the backup, you need to use tar utility to extract files.
+ ///
+ public long? MaxLogEntrySize
+ {
+ get => maxLogEntrySize > 0L ? maxLogEntrySize : null;
+ set => maxLogEntrySize = value.GetValueOrDefault();
+ }
+
+ ///
+ /// Gets or sets a value indicating that legacy binary format must be used.
+ ///
+ [Obsolete("Use default format instead.")]
+ public bool UseLegacyBinaryFormat
+ {
+ get => maxLogEntrySize < 0L;
+ set => maxLogEntrySize = value ? -1L : 0L;
+ }
+
///
/// If set then every read operations will be performed
/// on buffered copy of the log entries.
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs
index 5f164e6e1..3d210ec7a 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs
@@ -1,6 +1,8 @@
-using System.Diagnostics;
+using System.Collections;
+using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
using static System.Globalization.CultureInfo;
namespace DotNext.Net.Cluster.Consensus.Raft;
@@ -10,69 +12,24 @@ namespace DotNext.Net.Cluster.Consensus.Raft;
public partial class PersistentState
{
- /*
- Partition file format:
- FileName - number of partition
- Payload:
- [struct LogEntryMetadata] X Capacity - prologue with metadata
- [octet string] X number of entries
- */
- private protected sealed class Partition : ConcurrentStorageAccess
+ private protected abstract class Partition : ConcurrentStorageAccess
{
internal const int MaxRecordsPerPartition = int.MaxValue / LogEntryMetadata.Size;
- private static readonly CacheRecord EmptyRecord = new();
+ protected static readonly CacheRecord EmptyRecord = new();
internal readonly long FirstIndex, PartitionNumber, LastIndex;
- private MemoryOwner entryCache;
private Partition? previous, next;
+ protected MemoryOwner entryCache;
+ protected int runningIndex;
- // metadata management
- private MemoryOwner metadata;
- private int metadataFlushStartAddress;
- private int metadataFlushEndAddress;
-
- // represents offset within the file from which a newly added log entry payload can be recorded
- private long writeAddress;
-
- internal Partition(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize)
- : base(Path.Combine(location.FullName, partitionNumber.ToString(InvariantCulture)), checked(LogEntryMetadata.Size * recordsPerPartition), bufferSize, manager.BufferAllocator, readersCount, writeMode, initialSize)
+ protected Partition(DirectoryInfo location, int offset, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize, FileAttributes attributes = FileAttributes.NotContentIndexed)
+ : base(Path.Combine(location.FullName, partitionNumber.ToString(InvariantCulture)), offset, bufferSize, manager.BufferAllocator, readersCount, writeMode, initialSize, attributes)
{
FirstIndex = partitionNumber * recordsPerPartition;
LastIndex = FirstIndex + recordsPerPartition - 1L;
PartitionNumber = partitionNumber;
- // allocate metadata segment
- metadata = manager.BufferAllocator.AllocateExactly(fileOffset);
- metadataFlushStartAddress = int.MaxValue;
-
entryCache = manager.AllocLogEntryCache(recordsPerPartition);
- writeAddress = fileOffset;
- }
-
- internal void Initialize()
- {
- using var handle = File.OpenHandle(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, FileOptions.SequentialScan);
- if (RandomAccess.Read(handle, metadata.Span, 0L) < fileOffset)
- {
- metadata.Span.Clear();
- RandomAccess.Write(handle, metadata.Span, 0L);
- }
- else
- {
- writeAddress = Math.Max(fileOffset, GetWriteAddress(metadata.Span));
- }
-
- static long GetWriteAddress(ReadOnlySpan metadataTable)
- {
- long result;
-
- for (result = 0L; !metadataTable.IsEmpty; metadataTable = metadataTable.Slice(LogEntryMetadata.Size))
- {
- result = Math.Max(result, LogEntryMetadata.GetEndOfLogEntry(metadataTable));
- }
-
- return result;
- }
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -136,53 +93,14 @@ internal void DetachDescendant()
internal bool Contains(long recordIndex)
=> recordIndex >= FirstIndex && recordIndex <= LastIndex;
- private async ValueTask FlushAsync(ReadOnlyMemory metadata, CancellationToken token)
- {
- await RandomAccess.WriteAsync(Handle, metadata, metadataFlushStartAddress, token).ConfigureAwait(false);
- metadataFlushStartAddress = int.MaxValue;
- metadataFlushEndAddress = 0;
-
- await base.FlushAsync(token).ConfigureAwait(false);
- }
+ internal abstract void Initialize();
- public override ValueTask FlushAsync(CancellationToken token = default)
- {
- var size = metadataFlushEndAddress - metadataFlushStartAddress;
- return size > 0
- ? FlushAsync(metadata.Memory.Slice(metadataFlushStartAddress, size), token)
- : base.FlushAsync(token);
- }
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- private Span GetMetadata(int index, out int offset)
- {
- Debug.Assert(metadata.Length == fileOffset);
-
- return metadata.Span.Slice(offset = index * LogEntryMetadata.Size);
- }
-
- private void WriteMetadata(int index, in LogEntryMetadata metadata)
- {
- metadata.Format(GetMetadata(index, out var offset));
-
- metadataFlushStartAddress = Math.Min(metadataFlushStartAddress, offset);
- metadataFlushEndAddress = Math.Max(metadataFlushEndAddress, offset + LogEntryMetadata.Size);
- }
-
- internal long GetTerm(long absoluteIndex)
- {
- Debug.Assert(absoluteIndex >= FirstIndex && absoluteIndex <= LastIndex, $"Invalid index value {absoluteIndex}, offset {FirstIndex}");
-
- return LogEntryMetadata.GetTerm(GetMetadata(ToRelativeIndex(absoluteIndex), out _));
- }
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- private LogEntry Read(int sessionId, long absoluteIndex, out bool persisted, bool metadataOnly)
+ internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = false)
{
Debug.Assert(absoluteIndex >= FirstIndex && absoluteIndex <= LastIndex, $"Invalid index value {absoluteIndex}, offset {FirstIndex}");
var relativeIndex = ToRelativeIndex(absoluteIndex);
- var metadata = new LogEntryMetadata(GetMetadata(relativeIndex, out _));
+ var metadata = GetMetadata(relativeIndex);
ref readonly var cachedContent = ref EmptyRecord;
@@ -194,24 +112,22 @@ private LogEntry Read(int sessionId, long absoluteIndex, out bool persisted, boo
if (cachedContent.Content.IsEmpty && metadata.Length > 0L)
{
- persisted = true;
- return new(in metadata, absoluteIndex) { ContentReader = GetSessionReader(sessionId) };
+ return new(in metadata, absoluteIndex)
+ {
+ ContentReader = GetSessionReader(sessionId),
+ IsPersisted = true,
+ };
}
return_cached:
- persisted = cachedContent.PersistenceMode is not CachedLogEntryPersistenceMode.None;
- return new(in metadata, absoluteIndex) { ContentBuffer = cachedContent.Content.Memory };
+ return new(in metadata, absoluteIndex)
+ {
+ ContentBuffer = cachedContent.Content.Memory,
+ IsPersisted = cachedContent.PersistenceMode is not CachedLogEntryPersistenceMode.None,
+ };
}
- [MethodImpl(MethodImplOptions.NoInlining)]
- internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = false)
- => Read(sessionId, absoluteIndex, out _, metadataOnly);
-
- [MethodImpl(MethodImplOptions.NoInlining)]
- internal LogEntry Read(int sessionId, long absoluteIndex, out bool persisted)
- => Read(sessionId, absoluteIndex, out persisted, metadataOnly: false);
-
- internal ValueTask PersistCachedEntryAsync(long absoluteIndex, long offset, bool removeFromMemory)
+ internal ValueTask PersistCachedEntryAsync(long absoluteIndex, bool removeFromMemory)
{
Debug.Assert(entryCache.IsEmpty is false);
@@ -221,19 +137,22 @@ internal ValueTask PersistCachedEntryAsync(long absoluteIndex, long offset, bool
ref var cachedEntry = ref entryCache[index];
Debug.Assert(cachedEntry.PersistenceMode is CachedLogEntryPersistenceMode.None);
cachedEntry.PersistenceMode = CachedLogEntryPersistenceMode.CopyToBuffer;
+ var offset = GetOffset(index);
return cachedEntry.Content.IsEmpty
? ValueTask.CompletedTask
: removeFromMemory
? PersistAndDeleteAsync(cachedEntry.Content.Memory, index, offset)
- : PersistAsync(cachedEntry.Content.Memory, offset);
+ : PersistAsync(cachedEntry.Content.Memory, index, offset);
}
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
- private async ValueTask PersistAsync(ReadOnlyMemory content, long offset)
+ private async ValueTask PersistAsync(ReadOnlyMemory content, int index, long offset)
{
await SetWritePositionAsync(offset).ConfigureAwait(false);
await writer.WriteAsync(content).ConfigureAwait(false);
+
+ runningIndex = index;
}
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
@@ -249,9 +168,21 @@ private async ValueTask PersistAndDeleteAsync(ReadOnlyMemory content, int
{
entryCache[index].Dispose();
}
+
+ runningIndex = index;
+ }
+
+ internal long GetTerm(long absoluteIndex)
+ {
+ Debug.Assert(absoluteIndex >= FirstIndex && absoluteIndex <= LastIndex, $"Invalid index value {absoluteIndex}, offset {FirstIndex}");
+
+ return GetMetadata(ToRelativeIndex(absoluteIndex)).Term;
}
- private void UpdateCache(in CachedLogEntry entry, int index, long offset)
+ private long GetOffset(int index)
+ => GetMetadata(index).Offset;
+
+ private void UpdateCache(in CachedLogEntry entry, int index)
{
Debug.Assert(entryCache.IsEmpty is false);
Debug.Assert((uint)index < (uint)entryCache.Length);
@@ -259,83 +190,607 @@ private void UpdateCache(in CachedLogEntry entry, int index, long offset)
ref var cachedEntry = ref entryCache[index];
cachedEntry.Dispose();
cachedEntry = entry;
-
- // save new log entry to the allocation table
- WriteMetadata(index, LogEntryMetadata.Create(in entry, offset));
}
- private async ValueTask WriteAsync(TEntry entry, int index, long offset, CancellationToken token)
- where TEntry : notnull, IRaftLogEntry
- {
- // slow path - persist log entry
- await SetWritePositionAsync(offset, token).ConfigureAwait(false);
- await entry.WriteToAsync(writer, token).ConfigureAwait(false);
+ protected abstract LogEntryMetadata GetMetadata(int index);
- // save new log entry to the allocation table
- var length = writer.WritePosition - offset;
- WriteMetadata(index, LogEntryMetadata.Create(entry, offset, length));
- writeAddress = offset + length;
- }
+ protected abstract ValueTask PersistAsync(TEntry entry, int index, CancellationToken token)
+ where TEntry : notnull, IRaftLogEntry;
- private async ValueTask WriteThroughAsync(ReadOnlyMemory content, long offset, CancellationToken token)
- {
- await SetWritePositionAsync(offset, token).ConfigureAwait(false);
- Debug.Assert(writer.HasBufferedData is false);
+ protected abstract ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token);
- await RandomAccess.WriteAsync(Handle, content, offset, token).ConfigureAwait(false);
- writer.FilePosition = writeAddress = offset + content.Length;
- }
+ protected abstract void OnCached(in CachedLogEntry cachedEntry, int index);
- internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, CancellationToken token = default)
+ internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, CancellationToken token)
where TEntry : notnull, IRaftLogEntry
{
// write operation always expects absolute index so we need to convert it to the relative index
var relativeIndex = ToRelativeIndex(absoluteIndex);
Debug.Assert(absoluteIndex >= FirstIndex && relativeIndex <= LastIndex, $"Invalid index value {absoluteIndex}, offset {FirstIndex}");
- Debug.Assert(writeAddress > 0L);
if (typeof(TEntry) == typeof(CachedLogEntry))
{
ref readonly var cachedEntry = ref Unsafe.As(ref entry);
// fast path - just add cached log entry to the cache table
- UpdateCache(in cachedEntry, relativeIndex, writeAddress);
+ UpdateCache(in cachedEntry, relativeIndex);
// Perf: we can skip FileWriter internal buffer and write cached log entry directly to the disk
- ValueTask result;
switch (cachedEntry.PersistenceMode)
{
- case CachedLogEntryPersistenceMode.CopyToBuffer:
- result = WriteAsync(entry, relativeIndex, writeAddress, token);
- break;
- case CachedLogEntryPersistenceMode.SkipBuffer:
- result = WriteThroughAsync(cachedEntry.Content.Memory, writeAddress, token);
- break;
+ case CachedLogEntryPersistenceMode.SkipBuffer when !writer.HasBufferedData:
+ return WriteThroughAsync(cachedEntry, relativeIndex, token);
+ case CachedLogEntryPersistenceMode.None:
+ OnCached(in cachedEntry, relativeIndex);
+ return ValueTask.CompletedTask;
default:
- writeAddress += cachedEntry.Length;
- result = ValueTask.CompletedTask;
- break;
+ goto exit;
}
-
- return result;
}
// invalidate cached log entry on write
if (!entryCache.IsEmpty)
entryCache[relativeIndex].Dispose();
- return WriteAsync(entry, relativeIndex, writeAddress, token);
+ exit:
+ return PersistAsync(entry, relativeIndex, token);
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
- metadata.Dispose();
- entryCache.ReleaseAll();
previous = next = null;
+ entryCache.ReleaseAll();
+ }
+
+ base.Dispose(disposing);
+ }
+ }
+
+ private sealed class SparsePartition : Partition, IReadOnlyList>
+ {
+ private readonly long maxLogEntrySize;
+ private MemoryOwner metadataTable;
+ private MemoryOwner metadataBuffer;
+ private ReadOnlyMemory payloadBuffer;
+
+ internal SparsePartition(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize, long maxLogEntrySize)
+ : base(location, offset: 0, bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize, FileAttributes.NotContentIndexed | FileAttributes.SparseFile)
+ {
+ metadataTable = manager.Allocate(recordsPerPartition);
+ metadataTable.Span.Clear(); // to prevent pre-filled objects
+
+ this.maxLogEntrySize = maxLogEntrySize;
+ metadataBuffer = manager.BufferAllocator.AllocateExactly(LogEntryMetadata.Size);
+ }
+
+ internal override void Initialize()
+ {
+ // do nothing
+ }
+
+ private long GetMetadataOffset(int index) => index * (maxLogEntrySize + LogEntryMetadata.Size);
+
+ protected override LogEntryMetadata GetMetadata(int index)
+ {
+ ref var entry = ref metadataTable[index];
+
+ if (!entry.IsLoaded)
+ {
+ // very rare so can be done synchronously
+ Span buffer = stackalloc byte[LogEntryMetadata.Size];
+ RandomAccess.Read(Handle, buffer, GetMetadataOffset(index));
+ entry.Metadata = new(buffer);
+ }
+
+ return entry.Metadata;
+ }
+
+ protected override void OnCached(in CachedLogEntry cachedEntry, int index)
+ => metadataTable[index].Metadata = LogEntryMetadata.Create(cachedEntry, GetMetadataOffset(index) + LogEntryMetadata.Size);
+
+ [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
+ protected override async ValueTask PersistAsync(TEntry entry, int index, CancellationToken token)
+ {
+ var metadataOffset = GetMetadataOffset(index);
+ LogEntryMetadata metadata;
+
+ if (entry.Length is not { } length)
+ {
+ // slow path - write the entry first and then write metadata
+ await SetWritePositionAsync(metadataOffset + LogEntryMetadata.Size, token).ConfigureAwait(false);
+ length = writer.WritePosition;
+
+ await entry.WriteToAsync(writer, token).ConfigureAwait(false);
+ length = writer.WritePosition - length;
+
+ if (length > maxLogEntrySize)
+ goto too_large_entry;
+
+ metadata = LogEntryMetadata.Create(entry, metadataOffset + LogEntryMetadata.Size, length);
+ metadata.Format(metadataBuffer.Span);
+ await RandomAccess.WriteAsync(Handle, metadataBuffer.Memory, metadataOffset, token).ConfigureAwait(false);
+ }
+ else if (length <= maxLogEntrySize)
+ {
+ // fast path - length is known, metadata and the log entry can be written sequentially
+ metadata = LogEntryMetadata.Create(entry, metadataOffset + LogEntryMetadata.Size, length);
+ await SetWritePositionAsync(metadataOffset, token).ConfigureAwait(false);
+
+ await writer.WriteAsync(metadata, token).ConfigureAwait(false);
+ await entry.WriteToAsync(writer, token).ConfigureAwait(false);
+ }
+ else
+ {
+ goto too_large_entry;
+ }
+
+ metadataTable[index].Metadata = metadata;
+ return;
+
+ too_large_entry:
+ throw new InvalidOperationException(ExceptionMessages.LogEntryPayloadTooLarge);
+ }
+
+ [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
+ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token)
+ {
+ Debug.Assert(writer.HasBufferedData is false);
+
+ if (entry.Length > maxLogEntrySize)
+ throw new InvalidOperationException(ExceptionMessages.LogEntryPayloadTooLarge);
+
+ var metadata = LogEntryMetadata.Create(entry, GetMetadataOffset(index) + LogEntryMetadata.Size);
+ metadata.Format(metadataBuffer.Span);
+
+ payloadBuffer = entry.Content.Memory;
+ await RandomAccess.WriteAsync(Handle, this, GetMetadataOffset(index), token).ConfigureAwait(false);
+ payloadBuffer = default;
+ metadataTable[index].Metadata = metadata;
+
+ writer.FilePosition = metadata.End;
+ }
+
+ ReadOnlyMemory IReadOnlyList>.this[int index] => index switch
+ {
+ 0 => metadataBuffer.Memory,
+ 1 => payloadBuffer,
+ _ => throw new ArgumentOutOfRangeException(nameof(index)),
+ };
+
+ int IReadOnlyCollection>.Count => 2;
+
+ private IEnumerator> GetEnumerator()
+ {
+ yield return metadataBuffer.Memory;
+ yield return payloadBuffer;
+ }
+
+ IEnumerator> IEnumerable>.GetEnumerator()
+ => GetEnumerator();
+
+ IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+
+ protected override void Dispose(bool disposing)
+ {
+ metadataTable.Dispose();
+ metadataBuffer.Dispose();
+ base.Dispose(disposing);
+ }
+
+ [StructLayout(LayoutKind.Auto)]
+ private struct CachedLogEntryMetadata
+ {
+ private LogEntryMetadata metadata;
+ private volatile bool loaded;
+
+ internal readonly bool IsLoaded => loaded;
+
+ internal LogEntryMetadata Metadata
+ {
+ readonly get => metadata;
+ set
+ {
+ metadata = value;
+ loaded = true;
+ }
+ }
+ }
+ }
+
+ /*
+ Partition file format:
+ FileName - number of partition
+ Payload:
+ [512 bytes] - header:
+ [1 byte] - true if completed partition
+ [struct LogEntryMetadata] [octet string] X Capacity - log entries prefixed with metadata
+ [struct LogEntryMetadata] X Capacity - a table of log entries within the file, if partition is completed
+ */
+ private sealed class Table : Partition, IReadOnlyList>
+ {
+ private const int HeaderSize = 512;
+
+ private static readonly ReadOnlyMemory EmptyMetadata;
+ private static readonly ReadOnlyMemory EphemeralMetadata;
+
+ static Table()
+ {
+ EmptyMetadata = new byte[LogEntryMetadata.Size]; // all zeroes
+
+ var ephemeral = LogEntryMetadata.Create(LogEntry.Initial, HeaderSize + LogEntryMetadata.Size, length: 0L);
+ var buffer = new byte[LogEntryMetadata.Size];
+ ephemeral.Format(buffer);
+ EphemeralMetadata = buffer;
+ }
+
+ // metadata management
+ private MemoryOwner header, footer, metadataBuffer;
+ private (ReadOnlyMemory, ReadOnlyMemory) bufferTuple;
+
+ internal Table(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize)
+ : base(location, HeaderSize, bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize)
+ {
+ footer = manager.BufferAllocator.AllocateExactly(recordsPerPartition * LogEntryMetadata.Size);
+
+ header = manager.BufferAllocator.AllocateExactly(HeaderSize);
+ header.Span.Clear();
+
+ metadataBuffer = manager.BufferAllocator.AllocateExactly(LogEntryMetadata.Size);
+
+ // init ephemeral 0 entry
+ if (PartitionNumber is 0L)
+ {
+ EphemeralMetadata.CopyTo(footer.Memory);
}
+ }
+
+ private bool IsSealed
+ {
+ get => Unsafe.BitCast(MemoryMarshal.GetReference(header.Span));
+ set => MemoryMarshal.GetReference(header.Span) = Unsafe.BitCast(value);
+ }
+
+ internal override void Initialize()
+ {
+ using var handle = File.OpenHandle(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, FileOptions.SequentialScan);
+
+ // read header
+ if (RandomAccess.Read(Handle, header.Span, fileOffset: 0L) < HeaderSize)
+ {
+ header.Span.Clear();
+ }
+ else if (IsSealed)
+ {
+ // partition is completed, read table
+ var tableStart = RandomAccess.GetLength(Handle);
+ RandomAccess.Read(Handle, footer.Span, tableStart - footer.Length);
+ }
+ else
+ {
+ // read sequentially every log entry
+ int footerOffset;
+ long fileOffset;
+
+ if (PartitionNumber is 0L)
+ {
+ footerOffset = LogEntryMetadata.Size;
+ fileOffset = HeaderSize + LogEntryMetadata.Size;
+ }
+ else
+ {
+ footerOffset = 0;
+ fileOffset = HeaderSize;
+ }
+
+ for (Span metadataBuffer = this.metadataBuffer.Span, metadataTable = footer.Span; ; footerOffset += LogEntryMetadata.Size)
+ {
+ var count = RandomAccess.Read(Handle, metadataBuffer, fileOffset);
+ if (count < LogEntryMetadata.Size)
+ break;
+
+ fileOffset = LogEntryMetadata.GetEndOfLogEntry(metadataBuffer);
+ if (fileOffset <= 0L)
+ break;
+
+ metadataBuffer.CopyTo(metadataTable.Slice(footerOffset, LogEntryMetadata.Size));
+ }
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private Span GetMetadataSpan(int index)
+ => footer.Span.Slice(index * LogEntryMetadata.Size, LogEntryMetadata.Size);
+
+ protected override LogEntryMetadata GetMetadata(int index)
+ => new(GetMetadataSpan(index));
+
+ private long GetWriteAddress(int index)
+ => index is 0 ? fileOffset : LogEntryMetadata.GetEndOfLogEntry(GetMetadataSpan(index - 1));
+
+ [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
+ protected override async ValueTask PersistAsync(TEntry entry, int index, CancellationToken token)
+ {
+ var writeAddress = GetWriteAddress(index);
+
+ LogEntryMetadata metadata;
+ var startPos = writeAddress + LogEntryMetadata.Size;
+ if (entry.Length is { } length)
+ {
+ // fast path - write metadata and entry sequentially
+ metadata = LogEntryMetadata.Create(entry, startPos, length);
+
+ await SetWritePositionAsync(writeAddress, token).ConfigureAwait(false);
+ await writer.WriteAsync(metadata, token).ConfigureAwait(false);
+ await entry.WriteToAsync(writer, token).ConfigureAwait(false);
+ }
+ else
+ {
+ // slow path - write entry first
+ await SetWritePositionAsync(startPos, token).ConfigureAwait(false);
+
+ await entry.WriteToAsync(writer, token).ConfigureAwait(false);
+ length = writer.WritePosition - startPos;
+
+ metadata = LogEntryMetadata.Create(entry, startPos, length);
+ metadata.Format(metadataBuffer.Span);
+ await RandomAccess.WriteAsync(Handle, metadataBuffer.Memory, writeAddress, token).ConfigureAwait(false);
+ }
+
+ metadata.Format(GetMetadataSpan(index));
+ runningIndex = index;
+ }
+
+ [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
+ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token)
+ {
+ Debug.Assert(writer.HasBufferedData is false);
+
+ var writeAddress = GetWriteAddress(index);
+ var startPos = writeAddress + LogEntryMetadata.Size;
+ var metadata = LogEntryMetadata.Create(entry, startPos, entry.Length);
+ metadata.Format(metadataBuffer.Span);
+
+ bufferTuple = (metadataBuffer.Memory, entry.Content.Memory);
+ await RandomAccess.WriteAsync(Handle, this, writeAddress, token).ConfigureAwait(false);
+ bufferTuple = default;
+
+ metadata.Format(GetMetadataSpan(index));
+ runningIndex = index;
+ writer.FilePosition = metadata.End;
+ }
+
+ protected override void OnCached(in CachedLogEntry cachedEntry, int index)
+ {
+ var startPos = GetWriteAddress(index) + LogEntryMetadata.Size;
+ var metadata = LogEntryMetadata.Create(in cachedEntry, startPos);
+ metadata.Format(GetMetadataSpan(index));
+ }
+
+ public override ValueTask FlushAsync(CancellationToken token = default)
+ {
+ return runningIndex == LastIndex
+ ? FlushAndSealAsync(token)
+ : IsSealed
+ ? FlushAndUnsealAsync(token)
+ : base.FlushAsync(token);
+ }
+
+ private async ValueTask FlushAndSealAsync(CancellationToken token)
+ {
+ // use scatter I/O to flush the rest of the partition
+ if (writer.HasBufferedData)
+ {
+ bufferTuple = (writer.WrittenBuffer, footer.Memory);
+ await RandomAccess.WriteAsync(Handle, this, writer.FilePosition, token).ConfigureAwait(false);
+ writer.ClearBuffer();
+ writer.FilePosition += bufferTuple.Item1.Length;
+ bufferTuple = default;
+ }
+ else
+ {
+ await WriteFooterAsync(token).ConfigureAwait(false);
+ }
+
+ RandomAccess.SetLength(Handle, writer.FilePosition + footer.Length);
+
+ IsSealed = true;
+ await WriteHeaderAsync(token).ConfigureAwait(false);
+
+ writer.FlushToDisk();
+ }
+
+ private async ValueTask FlushAndUnsealAsync(CancellationToken token)
+ {
+ await FlushAndEraseNextEntryAsync(token).ConfigureAwait(false);
+
+ IsSealed = false;
+ await WriteHeaderAsync(token).ConfigureAwait(false);
+
+ writer.FlushToDisk();
+ }
+
+ private ValueTask FlushAndEraseNextEntryAsync(CancellationToken token)
+ {
+ ValueTask task;
+
+ // write the rest of the entry,
+ // then cleanup next entry header to indicate that the current entry is the last entry
+ if (!writer.HasBufferedData)
+ {
+ task = RandomAccess.WriteAsync(Handle, EmptyMetadata, writer.FilePosition, token);
+ }
+ else if (writer.Buffer is { Length: >= LogEntryMetadata.Size } emptyMetadataStub)
+ {
+ emptyMetadataStub.Span.Slice(0, LogEntryMetadata.Size).Clear();
+ writer.Produce(LogEntryMetadata.Size);
+ task = writer.WriteAsync(token);
+ }
+ else
+ {
+ task = writer.WriteAsync(EmptyMetadata, token);
+ }
+
+ return task;
+ }
+
+ private ValueTask WriteHeaderAsync(CancellationToken token)
+ => RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token);
+
+ private ValueTask WriteFooterAsync(CancellationToken token)
+ => RandomAccess.WriteAsync(Handle, footer.Memory, writer.FilePosition, token);
+
+ ReadOnlyMemory IReadOnlyList>.this[int index] => index switch
+ {
+ 0 => bufferTuple.Item1,
+ 1 => bufferTuple.Item2,
+ _ => throw new ArgumentOutOfRangeException(nameof(index)),
+ };
+
+ int IReadOnlyCollection>.Count => 2;
+
+ private IEnumerator> GetEnumerator()
+ {
+ yield return bufferTuple.Item1;
+ yield return bufferTuple.Item2;
+ }
+
+ IEnumerator> IEnumerable>.GetEnumerator()
+ => GetEnumerator();
+
+ IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+
+ protected override void Dispose(bool disposing)
+ {
+ header.Dispose();
+ footer.Dispose();
+ metadataBuffer.Dispose();
+ base.Dispose(disposing);
+ }
+ }
+
+ /*
+ Partition file format:
+ FileName - number of partition
+ Payload:
+ [struct LogEntryMetadata] X Capacity - prologue with metadata
+ [octet string] X number of entries
+ */
+ [Obsolete]
+ private sealed class LegacyPartition : Partition
+ {
+ // metadata management
+ private MemoryOwner metadata;
+ private int metadataFlushStartAddress;
+ private int metadataFlushEndAddress;
+
+ // represents offset within the file from which a newly added log entry payload can be recorded
+ private long writeAddress;
+
+ internal LegacyPartition(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize)
+ : base(location, checked(LogEntryMetadata.Size * recordsPerPartition), bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize)
+ {
+ // allocate metadata segment
+ metadata = manager.BufferAllocator.AllocateExactly(fileOffset);
+ metadataFlushStartAddress = int.MaxValue;
+
+ writeAddress = fileOffset;
+ }
+
+ internal override void Initialize()
+ {
+ using var handle = File.OpenHandle(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, FileOptions.SequentialScan);
+ if (RandomAccess.Read(handle, metadata.Span, 0L) < fileOffset)
+ {
+ metadata.Span.Clear();
+ RandomAccess.Write(handle, metadata.Span, 0L);
+ }
+ else
+ {
+ writeAddress = Math.Max(fileOffset, GetWriteAddress(metadata.Span));
+ }
+
+ static long GetWriteAddress(ReadOnlySpan metadataTable)
+ {
+ long result;
+
+ for (result = 0L; !metadataTable.IsEmpty; metadataTable = metadataTable.Slice(LogEntryMetadata.Size))
+ {
+ result = Math.Max(result, LogEntryMetadata.GetEndOfLogEntry(metadataTable));
+ }
+
+ return result;
+ }
+ }
+
+ private async ValueTask FlushAsync(ReadOnlyMemory metadata, CancellationToken token)
+ {
+ await RandomAccess.WriteAsync(Handle, metadata, metadataFlushStartAddress, token).ConfigureAwait(false);
+ metadataFlushStartAddress = int.MaxValue;
+ metadataFlushEndAddress = 0;
+
+ await base.FlushAsync(token).ConfigureAwait(false);
+ }
+
+ public override ValueTask FlushAsync(CancellationToken token = default)
+ {
+ var size = metadataFlushEndAddress - metadataFlushStartAddress;
+ return size > 0
+ ? FlushAsync(metadata.Memory.Slice(metadataFlushStartAddress, size), token)
+ : base.FlushAsync(token);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private Span GetMetadata(int index, out int offset)
+ {
+ Debug.Assert(metadata.Length == fileOffset);
+
+ return metadata.Span.Slice(offset = index * LogEntryMetadata.Size);
+ }
+
+ protected override LogEntryMetadata GetMetadata(int index)
+ => new(GetMetadata(index, out _));
+
+ private void WriteMetadata(int index, in LogEntryMetadata metadata)
+ {
+ metadata.Format(GetMetadata(index, out var offset));
+
+ metadataFlushStartAddress = Math.Min(metadataFlushStartAddress, offset);
+ metadataFlushEndAddress = Math.Max(metadataFlushEndAddress, offset + LogEntryMetadata.Size);
+ }
+
+ protected override async ValueTask PersistAsync(TEntry entry, int index, CancellationToken token)
+ {
+ // slow path - persist log entry
+ await SetWritePositionAsync(writeAddress, token).ConfigureAwait(false);
+ await entry.WriteToAsync(writer, token).ConfigureAwait(false);
+
+ // save new log entry to the allocation table
+ var length = writer.WritePosition - writeAddress;
+ WriteMetadata(index, LogEntryMetadata.Create(entry, writeAddress, length));
+ writeAddress += length;
+ }
+
+ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token)
+ {
+ await RandomAccess.WriteAsync(Handle, entry.Content.Memory, writeAddress, token).ConfigureAwait(false);
+
+ // save new log entry to the allocation table
+ WriteMetadata(index, LogEntryMetadata.Create(entry, writeAddress, entry.Length));
+ writeAddress += entry.Length;
+ }
+
+ protected override void OnCached(in CachedLogEntry cachedEntry, int index)
+ {
+ WriteMetadata(index, LogEntryMetadata.Create(cachedEntry, writeAddress, cachedEntry.Length));
+ writeAddress += cachedEntry.Length;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ metadata.Dispose();
base.Dispose(disposing);
}
}
@@ -393,7 +848,7 @@ private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? par
Debug.Assert(FirstPartition is null);
Debug.Assert(partition is null);
FirstPartition = LastPartition = partition = CreatePartition(partitionNumber);
- goto exit;
+ return;
}
Debug.Assert(FirstPartition is not null);
@@ -407,14 +862,14 @@ private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? par
if (previous < 0)
{
partition = Append(partitionNumber, partition);
- goto exit;
+ return;
}
// nothing on the right side, create new tail
if (partition.IsLast)
{
LastPartition = partition = Append(partitionNumber, partition);
- goto exit;
+ return;
}
partition = partition.Next;
@@ -423,28 +878,25 @@ private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? par
if (previous > 0)
{
partition = Prepend(partitionNumber, partition);
- goto exit;
+ return;
}
// nothing on the left side, create new head
if (partition.IsFirst)
{
FirstPartition = partition = Prepend(partitionNumber, partition);
- goto exit;
+ return;
}
partition = partition.Previous;
break;
default:
- goto exit;
+ return;
}
Debug.Assert(partition is not null);
}
- exit:
- return;
-
Partition Prepend(long partitionNumber, Partition partition)
{
var tmp = CreatePartition(partitionNumber);
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs
index 065623be3..d35bbc03b 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs
@@ -28,6 +28,7 @@ public abstract partial class PersistentState : Disposable, IPersistentState
private protected readonly int concurrentReads;
private protected readonly WriteMode writeMode;
private readonly bool parallelIO;
+ private readonly long maxLogEntrySize; // 0 - modern partition, > 0 - sparse partition, < 0 - legacy partition
static PersistentState()
{
@@ -57,6 +58,7 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O
? new FastSessionIdPool()
: new SlowSessionIdPool(concurrentReads);
parallelIO = configuration.ParallelIO;
+ maxLogEntrySize = configuration.maxLogEntrySize;
syncRoot = new(configuration.MaxConcurrentReads)
{
@@ -66,14 +68,44 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O
var partitionTable = new SortedSet(Comparer.Create(ComparePartitions));
// load all partitions from file system
- foreach (var file in path.EnumerateFiles())
- {
- if (long.TryParse(file.Name, out var partitionNumber))
- {
- var partition = new Partition(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize);
- partition.Initialize();
- partitionTable.Add(partition);
- }
+ switch (maxLogEntrySize)
+ {
+ case > 0L:
+ CreateSparsePartitions(
+ partitionTable,
+ path,
+ bufferSize,
+ recordsPerPartition,
+ in bufferManager,
+ concurrentReads,
+ writeMode,
+ initialSize,
+ maxLogEntrySize);
+ break;
+ case 0L:
+ CreateTables(
+ partitionTable,
+ path,
+ bufferSize,
+ recordsPerPartition,
+ in bufferManager,
+ concurrentReads,
+ writeMode,
+ initialSize);
+ break;
+ case < 0L:
+#pragma warning disable CS0618,CS0612
+ CreateLegacyPartitions(
+ partitionTable,
+ path,
+ bufferSize,
+ recordsPerPartition,
+ in bufferManager,
+ concurrentReads,
+ writeMode,
+ initialSize);
+ break;
+#pragma warning restore CS0618,CS0612
}
// constructed sorted list of partitions
@@ -97,6 +129,46 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O
measurementTags = configuration.MeasurementTags;
static int ComparePartitions(Partition x, Partition y) => x.PartitionNumber.CompareTo(y.PartitionNumber);
+
+ static void CreateTables(SortedSet partitionTable, DirectoryInfo path, int bufferSize, int recordsPerPartition, in BufferManager manager, int concurrentReads, WriteMode writeMode, long initialSize)
+ {
+ foreach (var file in path.EnumerateFiles())
+ {
+ if (long.TryParse(file.Name, out var partitionNumber))
+ {
+ var partition = new Table(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in manager, concurrentReads, writeMode, initialSize);
+ partition.Initialize();
+ partitionTable.Add(partition);
+ }
+ }
+ }
+
+ static void CreateSparsePartitions(SortedSet partitionTable, DirectoryInfo path, int bufferSize, int recordsPerPartition, in BufferManager manager, int concurrentReads, WriteMode writeMode, long initialSize, long maxLogEntrySize)
+ {
+ foreach (var file in path.EnumerateFiles())
+ {
+ if (long.TryParse(file.Name, out var partitionNumber))
+ {
+ var partition = new SparsePartition(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in manager, concurrentReads, writeMode, initialSize, maxLogEntrySize);
+ partition.Initialize();
+ partitionTable.Add(partition);
+ }
+ }
+ }
+
+ [Obsolete]
+ static void CreateLegacyPartitions(SortedSet partitionTable, DirectoryInfo path, int bufferSize, int recordsPerPartition, in BufferManager manager, int concurrentReads, WriteMode writeMode, long initialSize)
+ {
+ foreach (var file in path.EnumerateFiles())
+ {
+ if (long.TryParse(file.Name, out var partitionNumber))
+ {
+ var partition = new LegacyPartition(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in manager, concurrentReads, writeMode, initialSize);
+ partition.Initialize();
+ partitionTable.Add(partition);
+ }
+ }
+ }
}
private protected static Meter MeterRoot => ReadRateMeter.Meter;
@@ -110,8 +182,14 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O
bool IAuditTrail.IsLogEntryLengthAlwaysPresented => true;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private partial Partition CreatePartition(long partitionNumber)
- => new(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize);
+ private partial Partition CreatePartition(long partitionNumber) => maxLogEntrySize switch
+ {
+ > 0L => new SparsePartition(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize, maxLogEntrySize),
+ 0L => new Table(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize),
+#pragma warning disable CS0618,CS0612
+ < 0L => new LegacyPartition(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize),
+#pragma warning restore CS0618,CS0612
+ };
private ValueTask UnsafeReadAsync(ILogEntryConsumer reader, int sessionId, long startIndex, long endIndex, int length, bool snapshotRequested, CancellationToken token)
{
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.Membership.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.Membership.cs
index 44b7db518..d2e737d6d 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.Membership.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.Membership.cs
@@ -255,7 +255,7 @@ protected async Task AddMemberAsync(TMember member, int rounds,
throw new ConcurrentMembershipModificationException();
var leaderState = LeaderStateOrException;
- var tokenSource = token.LinkTo(leaderState.LeadershipToken);
+ var tokenSource = token.LinkTo(leaderState.Token);
try
{
// catch up node
@@ -293,7 +293,7 @@ protected async Task AddMemberAsync(TMember member, int rounds,
catch (OperationCanceledException e)
{
token = tokenSource?.CancellationOrigin ?? e.CancellationToken;
- throw token == leaderState.LeadershipToken
+ throw token == leaderState.Token
? new InvalidOperationException(ExceptionMessages.LocalNodeNotLeader, e)
: new OperationCanceledException(e.Message, e, token);
}
@@ -337,7 +337,7 @@ protected async Task RemoveMemberAsync(ClusterMemberId id, IClus
if (members.TryGetValue(id, out var member))
{
var leaderState = LeaderStateOrException;
- var tokenSource = token.LinkTo(leaderState.LeadershipToken);
+ var tokenSource = token.LinkTo(leaderState.Token);
try
{
// ensure that previous configuration has been committed
@@ -356,7 +356,7 @@ protected async Task RemoveMemberAsync(ClusterMemberId id, IClus
catch (OperationCanceledException e)
{
token = tokenSource?.CancellationOrigin ?? e.CancellationToken;
- throw token == leaderState.LeadershipToken
+ throw token == leaderState.Token
? new InvalidOperationException(ExceptionMessages.LocalNodeNotLeader, e)
: new OperationCanceledException(e.Message, e, token);
}
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs
index bc1dd5214..5c53b5e1f 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs
@@ -75,7 +75,7 @@ protected RaftCluster(IClusterMemberConfiguration config, in TagList measurement
readinessProbe = new(TaskCreationOptions.RunContinuationsAsynchronously);
aggressiveStickiness = config.AggressiveLeaderStickiness;
electionEvent = new(TaskCreationOptions.RunContinuationsAsynchronously);
- state = new StandbyState(this);
+ state = new StandbyState(this, TimeSpan.FromMilliseconds(electionTimeout));
EndPointComparer = config.EndPointComparer;
this.measurementTags = measurementTags;
}
@@ -139,10 +139,11 @@ public bool TryGetLeaseToken(out CancellationToken token)
return false;
}
- ///
- /// Gets the cancellation token that tracks the leader state of the current node.
- ///
- public CancellationToken LeadershipToken => (state as LeaderState)?.LeadershipToken ?? new(true);
+ ///
+ public CancellationToken LeadershipToken => (state as LeaderState)?.Token ?? new(true);
+
+ ///
+ public CancellationToken ConsensusToken => (state as TokenizedState)?.Token ?? new(true);
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private LeaderState LeaderStateOrException
@@ -326,7 +327,7 @@ public virtual async Task StartAsync(CancellationToken token)
{
if (await DetectLocalMemberAsync(member, token).ConfigureAwait(false))
{
- state = standbyNode ? new StandbyState(this) : new FollowerState(this);
+ state = standbyNode ? new StandbyState(this, LeaderLeaseDuration) : new FollowerState(this);
readinessProbe.TrySetResult();
Logger.StartedAsFollower(member.EndPoint);
return;
@@ -334,7 +335,7 @@ public virtual async Task StartAsync(CancellationToken token)
}
// local member is not known. Start in frozen state and wait when the current node will be added to the cluster
- state = new StandbyState(this);
+ state = new StandbyState(this, LeaderLeaseDuration);
Logger.StartedAsFrozen();
}
@@ -387,7 +388,7 @@ public async ValueTask EnableStandbyModeAsync(CancellationToken token = de
ObjectDisposedException.ThrowIf(IsDisposed, this);
RaftState currentState;
- if ((currentState = state) is FollowerState or CandidateState)
+ if ((currentState = state) is not StandbyState)
{
var tokenSource = token.LinkTo(LifecycleToken);
var lockTaken = false;
@@ -399,7 +400,7 @@ public async ValueTask EnableStandbyModeAsync(CancellationToken token = de
// ensure that we trying to update the same state
if (ReferenceEquals(state, currentState))
{
- await UpdateStateAsync(new StandbyState(this)).ConfigureAwait(false);
+ await UpdateStateAsync(new StandbyState(this, LeaderLeaseDuration)).ConfigureAwait(false);
return true;
}
}
@@ -496,8 +497,8 @@ private async ValueTask StepDown()
Logger.DowngradingToFollowerState(Term);
switch (state)
{
- case FollowerState followerState:
- followerState.Refresh();
+ case ConsensusTrackerState followerOrStandbyState:
+ followerOrStandbyState.Refresh();
break;
case LeaderState or CandidateState:
var newState = new FollowerState(this);
@@ -787,13 +788,9 @@ protected async ValueTask> VoteAsync(ClusterMemberId sender, long s
Leader = null;
await StepDown(senderTerm).ConfigureAwait(false);
}
- else if (state is FollowerState follower)
+ else if (state is ConsensusTrackerState followerOrStandbyState)
{
- follower.Refresh();
- }
- else if (state is StandbyState)
- {
- FollowerState.HeartbeatRateMeter.Add(1, in measurementTags);
+ followerOrStandbyState.Refresh();
}
else
{
@@ -864,9 +861,10 @@ protected async ValueTask ResignAsync(CancellationToken token)
/// The index of the last committed log entry on the sender side.
/// The token that can be used to cancel the operation.
/// The index of the last committed log entry known by the leader.
- [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] // hot path, avoid allocations
- protected async ValueTask SynchronizeAsync(long commitIndex, CancellationToken token)
+ protected ValueTask SynchronizeAsync(long commitIndex, CancellationToken token)
{
+ long? result = null;
+
// do not execute the next round of heartbeats if the sender is already in sync with the leader
if (state is LeaderState leaderState)
{
@@ -874,19 +872,24 @@ protected async ValueTask ResignAsync(CancellationToken token)
{
try
{
- await leaderState.ForceReplicationAsync(token).ConfigureAwait(false);
+ leaderState.ForceReplication();
}
catch (InvalidOperationException)
{
// local node is not a leader
- return null;
+ goto exit;
+ }
+ catch (Exception e)
+ {
+ return ValueTask.FromException(e);
}
}
- return auditTrail.LastCommittedEntryIndex;
+ result = auditTrail.LastCommittedEntryIndex;
}
- return null;
+ exit:
+ return new(result);
}
///
@@ -898,11 +901,19 @@ protected async ValueTask ResignAsync(CancellationToken token)
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
public async ValueTask ApplyReadBarrierAsync(CancellationToken token = default)
{
- for (; ; )
+ for (; ; token.ThrowIfCancellationRequested())
{
if (state is LeaderState leaderState)
{
- await leaderState.ForceReplicationAsync(token).ConfigureAwait(false);
+ try
+ {
+ await leaderState.ForceReplicationAsync(token).ConfigureAwait(false);
+ }
+ catch (InvalidOperationException)
+ {
+ // local node is not a leader, retry
+ continue;
+ }
}
else if (Leader is { } leader)
{
@@ -930,7 +941,44 @@ async ValueTask ICluster.ResignAsync(CancellationToken token)
private ValueTask MoveToStandbyState(bool resumable = true)
{
Leader = null;
- return UpdateStateAsync(new StandbyState(this) { Resumable = resumable });
+ return UpdateStateAsync(new StandbyState(this, LeaderLeaseDuration) { Resumable = resumable });
+ }
+
+ async void IRaftStateMachine.IncomingHeartbeatTimedOut(IRaftStateMachine.IWeakCallerStateIdentity callerState)
+ {
+ var lockTaken = false;
+ try
+ {
+ await transitionLock.AcquireAsync(LifecycleToken).ConfigureAwait(false);
+ lockTaken = true;
+
+ if (state is StandbyState standby && callerState.IsValid(standby))
+ {
+ if (standby.IsRefreshRequested)
+ {
+ standby.Refresh();
+ }
+ else
+ {
+ Leader = null;
+ }
+ }
+ }
+ catch (OperationCanceledException) when (lockTaken is false)
+ {
+ // ignore cancellation of lock acquisition
+ }
+ catch (ObjectDisposedException) when (lockTaken is false)
+ {
+ // ignore destroyed lock
+ }
+ finally
+ {
+ callerState.Clear();
+
+ if (lockTaken)
+ transitionLock.Release();
+ }
}
///
@@ -1167,7 +1215,7 @@ public async ValueTask ReplicateAsync(TEntry entry, CancellationTo
ObjectDisposedException.ThrowIf(IsDisposed, this);
var leaderState = LeaderStateOrException;
- var tokenSource = token.LinkTo(leaderState.LeadershipToken);
+ var tokenSource = token.LinkTo(leaderState.Token);
try
{
// 1 - append entry to the log
@@ -1182,7 +1230,7 @@ public async ValueTask ReplicateAsync(TEntry entry, CancellationTo
catch (OperationCanceledException e)
{
token = tokenSource?.CancellationOrigin ?? e.CancellationToken;
- throw token == leaderState.LeadershipToken
+ throw token == leaderState.Token
? new InvalidOperationException(ExceptionMessages.LocalNodeNotLeader, e)
: new OperationCanceledException(e.Message, e, token);
}
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftState.cs
index 3bb86c054..10a7f70d8 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftState.cs
@@ -31,6 +31,9 @@ private protected void MoveToFollowerState(bool randomizeTimeout, long? newTerm
private protected void UnavailableMemberDetected(TMember member, CancellationToken token)
=> ThreadPool.UnsafeQueueUserWorkItem(new UnavailableMemberNotification(this, member, token), preferLocal: false);
+ private protected void IncomingHeartbeatTimedOut()
+ => ThreadPool.UnsafeQueueUserWorkItem(new IncomingHeartbeatTimedOutNotification(this), preferLocal: true);
+
public new ValueTask DisposeAsync() => base.DisposeAsync();
// holds weak reference to the state that was an initiator of the work item
@@ -143,4 +146,10 @@ internal UnavailableMemberNotification(RaftState currentState, TMember
private protected override void Execute(IRaftStateMachine stateMachine)
=> stateMachine.UnavailableMemberDetected(this, member, token);
}
+
+ private sealed class IncomingHeartbeatTimedOutNotification(RaftState currentState) : StateTransitionWorkItem(currentState)
+ {
+ private protected override void Execute(IRaftStateMachine stateMachine)
+ => stateMachine.IncomingHeartbeatTimedOut(this);
+ }
}
\ No newline at end of file
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/StandbyState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/StandbyState.cs
index 1f962b13d..643829563 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/StandbyState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/StandbyState.cs
@@ -1,11 +1,108 @@
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+
namespace DotNext.Net.Cluster.Consensus.Raft;
///
/// This is ephemeral state indicating that
/// the cluster member will not become a leader.
///
-internal sealed class StandbyState(IRaftStateMachine stateMachine) : RaftState(stateMachine)
+internal sealed class StandbyState : ConsensusTrackerState
where TMember : class, IRaftClusterMember
{
+ private readonly TimeSpan consensusTimeout;
+ private readonly CancellationTokenSource lifecycleTokenSource;
+ private readonly CancellationToken lifecycleToken;
+
+ [SuppressMessage("Usage", "CA2213", Justification = "Disposed using DestroyLease() method")]
+ private volatile ConsensusTokenSource? consensusTokenSource;
+ private Task? tracker;
+
+ internal StandbyState(IRaftStateMachine stateMachine, TimeSpan consensusTimeout)
+ : base(stateMachine)
+ {
+ lifecycleTokenSource = new();
+ lifecycleToken = lifecycleTokenSource.Token;
+ this.consensusTimeout = consensusTimeout;
+ }
+
+ public override CancellationToken Token => consensusTokenSource?.Token ?? new(canceled: true);
+
internal bool Resumable { get; init; } = true;
+
+ public override void Refresh()
+ {
+ if (tracker is null or { IsCompletedSuccessfully: true } || consensusTokenSource is null)
+ {
+ tracker = Track();
+ }
+ else
+ {
+ refreshEvent.Set();
+ }
+
+ base.Refresh();
+ }
+
+ private async Task Track()
+ {
+ consensusTokenSource = new();
+
+ try
+ {
+ // spin loop to wait for the timeout
+ while (await refreshEvent.WaitAsync(consensusTimeout, lifecycleToken).ConfigureAwait(false))
+ {
+ // Transition can be suppressed. If so, resume the loop and reset the timer.
+ // If the event is in signaled state then the returned task is completed synchronously.
+ await suppressionEvent.WaitAsync(lifecycleToken).ConfigureAwait(false);
+ }
+ }
+ finally
+ {
+ using var cts = Interlocked.Exchange(ref consensusTokenSource, null);
+ cts.Cancel(throwOnFirstException: false);
+ cts.Dispose();
+ }
+
+ // Ignored if timeout tracking is aborted by OperationCanceledException.
+ // This could happen if the state is disposed asynchronously due to transition to another state.
+ IncomingHeartbeatTimedOut();
+ }
+
+ protected override async ValueTask DisposeAsyncCore()
+ {
+ try
+ {
+ lifecycleTokenSource.Cancel(throwOnFirstException: false);
+ await (tracker ?? Task.CompletedTask).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ Logger.StandbyStateExitedWithError(e);
+ }
+ finally
+ {
+ Dispose(disposing: true);
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ lifecycleTokenSource.Dispose();
+ Interlocked.Exchange(ref consensusTokenSource, null)?.Dispose();
+ tracker = null;
+ }
+
+ base.Dispose(disposing);
+ }
+
+ private sealed class ConsensusTokenSource : CancellationTokenSource
+ {
+ internal readonly new CancellationToken Token; // cached to avoid ObjectDisposedException
+
+ internal ConsensusTokenSource() => Token = base.Token;
+ }
}
\ No newline at end of file
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TokenizedState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TokenizedState.cs
new file mode 100644
index 000000000..a2cbb9e68
--- /dev/null
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TokenizedState.cs
@@ -0,0 +1,7 @@
+namespace DotNext.Net.Cluster.Consensus.Raft;
+
+internal abstract class TokenizedState(IRaftStateMachine stateMachine) : RaftState(stateMachine)
+ where TMember : class, IRaftClusterMember
+{
+ public abstract CancellationToken Token { get; }
+}
\ No newline at end of file