Skip to content

Commit

Permalink
Release 5.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Apr 20, 2024
1 parent 40e1dd9 commit 5947622
Show file tree
Hide file tree
Showing 32 changed files with 1,271 additions and 295 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
Release Notes
====

# 04-20-2024
<a href="https://www.nuget.org/packages/dotnext.io/5.4.0">DotNext.IO 5.4.0</a>
* Added `FileWriter.WrittenBuffer` property

<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.4.0">DotNext.Net.Cluster 5.4.0</a>
* Changed binary file format for WAL for more efficient I/O. A new format is incompatible with all previous versions. To enable legacy format, set `PersistentState.Options.UseLegacyBinaryFormat` property to **true**
* Introduced a new experimental binary format for WAL based on sparse files. Can be enabled with `PersistentState.Options.MaxLogEntrySize` property

<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.4.0">DotNext.AspNetCore.Cluster 5.4.0</a>
* Updated dependencies

# 03-20-2024
<a href="https://www.nuget.org/packages/dotnext/5.3.1">DotNext 5.3.1</a>
* Provided support of thread-local storage for `StreamSource.AsSharedStream`
Expand Down
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,17 @@ All these things are implemented in 100% managed code on top of existing .NET AP
* [NuGet Packages](https://www.nuget.org/profiles/rvsakno)

# What's new
Release Date: 03-20-2024
Release Date: 04-20-2024

<a href="https://www.nuget.org/packages/dotnext/5.3.1">DotNext 5.3.1</a>
* Provided support of thread-local storage for `StreamSource.AsSharedStream`
* Remove type cast for `Func.Constant` static method
<a href="https://www.nuget.org/packages/dotnext.io/5.4.0">DotNext.IO 5.4.0</a>
* Added `FileWriter.WrittenBuffer` property

<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.4.0">DotNext.Net.Cluster 5.4.0</a>
* Changed binary file format for WAL for more efficient I/O. A new format is incompatible with all previous versions. To enable legacy format, set `PersistentState.Options.UseLegacyBinaryFormat` property to **true**
* Introduced a new experimental binary format for WAL based on sparse files. Can be enabled with `PersistentState.Options.MaxLogEntrySize` property

<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.4.0">DotNext.AspNetCore.Cluster 5.4.0</a>
* Updated dependencies

Changelog for previous versions located [here](./CHANGELOG.md).

Expand Down
2 changes: 1 addition & 1 deletion src/DotNext.IO/DotNext.IO.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<Authors>.NET Foundation and Contributors</Authors>
<Company />
<Product>.NEXT Family of Libraries</Product>
<VersionPrefix>5.3.0</VersionPrefix>
<VersionPrefix>5.4.0</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyName>DotNext.IO</AssemblyName>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
4 changes: 2 additions & 2 deletions src/DotNext.IO/IO/FileWriter.Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ public partial class FileWriter : IDynamicInterfaceCastable

private ReadOnlyMemory<byte> GetBuffer(int index) => index switch
{
0 => WrittenMemory,
0 => WrittenBuffer,
1 => secondBuffer,
_ => ReadOnlyMemory<byte>.Empty,
};

private IEnumerator<ReadOnlyMemory<byte>> EnumerateBuffers()
{
yield return WrittenMemory;
yield return WrittenBuffer;
yield return secondBuffer;
}

Expand Down
15 changes: 9 additions & 6 deletions src/DotNext.IO/IO/FileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ public FileWriter(FileStream destination, int bufferSize = 4096, MemoryAllocator
throw new ArgumentException(ExceptionMessages.StreamNotWritable, nameof(destination));
}

private ReadOnlyMemory<byte> WrittenMemory => buffer.Memory.Slice(0, bufferOffset);
/// <summary>
/// Gets written part of the buffer.
/// </summary>
public ReadOnlyMemory<byte> WrittenBuffer => buffer.Memory.Slice(0, bufferOffset);

private int FreeCapacity => buffer.Length - bufferOffset;

Expand Down Expand Up @@ -138,11 +141,11 @@ public long FilePosition
public long WritePosition => fileOffset + bufferOffset;

private ValueTask FlushCoreAsync(CancellationToken token)
=> Submit(RandomAccess.WriteAsync(handle, WrittenMemory, fileOffset, token), writeCallback);
=> Submit(RandomAccess.WriteAsync(handle, WrittenBuffer, fileOffset, token), writeCallback);

private void FlushCore()
{
RandomAccess.Write(handle, WrittenMemory.Span, fileOffset);
RandomAccess.Write(handle, WrittenBuffer.Span, fileOffset);
fileOffset += bufferOffset;
bufferOffset = 0;
}
Expand Down Expand Up @@ -198,7 +201,7 @@ private void WriteSlow(ReadOnlySpan<byte> input)
{
if (input.Length >= buffer.Length)
{
RandomAccess.Write(handle, WrittenMemory.Span, fileOffset);
RandomAccess.Write(handle, WrittenBuffer.Span, fileOffset);
fileOffset += bufferOffset;

RandomAccess.Write(handle, input, fileOffset);
Expand All @@ -207,7 +210,7 @@ private void WriteSlow(ReadOnlySpan<byte> input)
}
else
{
RandomAccess.Write(handle, WrittenMemory.Span, fileOffset);
RandomAccess.Write(handle, WrittenBuffer.Span, fileOffset);
fileOffset += bufferOffset;
input.CopyTo(buffer.Span);
bufferOffset += input.Length;
Expand Down Expand Up @@ -271,7 +274,7 @@ private ValueTask WriteAndCopyAsync(ReadOnlyMemory<byte> input, CancellationToke
Debug.Assert(HasBufferedData);

secondBuffer = input;
return Submit(RandomAccess.WriteAsync(handle, WrittenMemory, fileOffset, token), writeAndCopyCallback);
return Submit(RandomAccess.WriteAsync(handle, WrittenBuffer, fileOffset, token), writeAndCopyCallback);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft.Http;
using Diagnostics;
using Messaging;
using Replication;
using Threading;
using static DotNext.Extensions.Logging.TestLoggers;

[Collection(TestCollections.Raft)]
Expand Down Expand Up @@ -447,7 +448,6 @@ public static async Task StandbyMode()
await GetLocalClusterView(host3).Readiness.WaitAsync(DefaultTimeout);

// suspend two nodes
False(await GetLocalClusterView(host1).EnableStandbyModeAsync());
True(await GetLocalClusterView(host2).EnableStandbyModeAsync());
True(GetLocalClusterView(host2).Standby);
True(await GetLocalClusterView(host3).EnableStandbyModeAsync());
Expand Down Expand Up @@ -775,4 +775,46 @@ public async Task RegressionIssue153()
static IFailureDetector CreateFailureDetector(TimeSpan estimate, IRaftClusterMember member)
=> new PhiAccrualFailureDetector(estimate) { Threshold = 3D, TreatUnknownValueAsUnhealthy = true };
}

[Fact]
public static async Task ConsensusToken()
{
var config1 = new Dictionary<string, string>
{
{"publicEndPoint", "http://localhost:3262"},
{"coldStart", "true"},
// {"requestTimeout", "00:00:01"},
// {"rpcTimeout", "00:00:01"},
// {"lowerElectionTimeout", "6000" },
// {"upperElectionTimeout", "9000" },
};

var config2 = new Dictionary<string, string>
{
{"publicEndPoint", "http://localhost:3263"},
{"coldStart", "false"},
{"standby", "true"},
};

using var host1 = CreateHost<Startup>(3262, config1);
await host1.StartAsync();

using var host2 = CreateHost<Startup>(3263, config2);
True(GetLocalClusterView(host2).ConsensusToken.IsCancellationRequested);
await host2.StartAsync();

await GetLocalClusterView(host1).WaitForLeaderAsync(DefaultTimeout);
Equal(GetLocalClusterView(host1).LeadershipToken, GetLocalClusterView(host1).ConsensusToken);
True(await GetLocalClusterView(host1).AddMemberAsync(GetLocalClusterView(host2).LocalMemberAddress));

await GetLocalClusterView(host2).Readiness.WaitAsync(DefaultTimeout);

var token = GetLocalClusterView(host2).ConsensusToken;
False(token.IsCancellationRequested);

await host1.StopAsync();

await token.WaitAsync();
await host2.StopAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,14 @@ public static async Task QueryAppendEntries(long partitionSize, bool caching, in
}
}

[Fact]
public static async Task ParallelReads()
[Theory]
[InlineData(null)]
[InlineData(1024L * 1024L * 100L)]
public static async Task ParallelReads(long? maxLogEntrySize)
{
var entry = new TestLogEntry("SET X = 0") { Term = 42L };
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
IPersistentState state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { CopyOnReadOptions = new() });
IPersistentState state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { CopyOnReadOptions = new(), MaxLogEntrySize = maxLogEntrySize });
try
{
Equal(1L, await state.AppendAsync(new LogEntryList(entry)));
Expand Down Expand Up @@ -235,11 +237,13 @@ public static async Task ParallelReads()
}
}

[Fact]
public static async Task AppendWhileReading()
[Theory]
[InlineData(null)]
[InlineData(1024L * 1024L * 100L)]
public static async Task AppendWhileReading(long? maxLogEntrySize)
{
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition);
using var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { MaxLogEntrySize = maxLogEntrySize });
var entry = new TestLogEntry("SET X = 0") { Term = 42L };
await state.AppendAsync(entry);

Expand Down Expand Up @@ -281,8 +285,48 @@ public static async Task DropRecords(bool reuseSpace)
Equal(0L, state.LastCommittedEntryIndex);
}

[Theory]
[InlineData(null)]
[InlineData(1024L * 1024L * 100L)]
public static async Task Overwrite(long? maxLogEntrySize)
{
var entry1 = new TestLogEntry("SET X = 0") { Term = 42L };
var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
var entry3 = new TestLogEntry("SET Z = 2") { Term = 44L };
var entry4 = new TestLogEntry("SET U = 3") { Term = 45L };
var entry5 = new TestLogEntry("SET V = 4") { Term = 46L };
Func<IReadOnlyList<IRaftLogEntry>, long?, CancellationToken, ValueTask<Missing>> checker;
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { MaxLogEntrySize = maxLogEntrySize }))
{
Equal(1L, await state.AppendAsync(new LogEntryList(entry2, entry3, entry4, entry5)));
Equal(4L, state.LastEntryIndex);
Equal(0L, state.LastCommittedEntryIndex);
await state.AppendAsync(entry1, 1L);
Equal(1L, state.LastEntryIndex);
Equal(0L, state.LastCommittedEntryIndex);
}

//read again
using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { MaxLogEntrySize = maxLogEntrySize }))
{
Equal(1L, state.LastEntryIndex);
Equal(0L, state.LastCommittedEntryIndex);
checker = async (entries, snapshotIndex, token) =>
{
Null(snapshotIndex);
Single(entries);
False(entries[0].IsSnapshot);
Equal(entry1.Content, await entries[0].ToStringAsync(Encoding.UTF8));
return Missing.Value;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1L, CancellationToken.None);
}
}

[Obsolete]
[Fact]
public static async Task Overwrite()
public static async Task LegacyOverwrite()
{
var entry1 = new TestLogEntry("SET X = 0") { Term = 42L };
var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
Expand All @@ -291,7 +335,7 @@ public static async Task Overwrite()
var entry5 = new TestLogEntry("SET V = 4") { Term = 46L };
Func<IReadOnlyList<IRaftLogEntry>, long?, CancellationToken, ValueTask<Missing>> checker;
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition))
using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { UseLegacyBinaryFormat = true }))
{
Equal(1L, await state.AppendAsync(new LogEntryList(entry2, entry3, entry4, entry5)));
Equal(4L, state.LastEntryIndex);
Expand All @@ -302,7 +346,7 @@ public static async Task Overwrite()
}

//read again
using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition))
using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { UseLegacyBinaryFormat = true }))
{
Equal(1L, state.LastEntryIndex);
Equal(0L, state.LastCommittedEntryIndex);
Expand Down Expand Up @@ -833,6 +877,37 @@ public static async Task RestoreBackup()
}
}

[PlatformSpecificFact("linux")]
public static async Task CreateSparseBackup()
{
var entry1 = new TestLogEntry("SET X = 0") { Term = 42L };
var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
var entry3 = new TestLogEntry("SET Z = 2") { Term = 44L };
var entry4 = new TestLogEntry("SET U = 3") { Term = 45L };
var entry5 = new TestLogEntry("SET V = 4") { Term = 46L };
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
var backupFile = Path.GetTempFileName();
IPersistentState state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { MaxLogEntrySize = 1024 * 1024, BackupFormat = System.Formats.Tar.TarEntryFormat.Gnu });
var member = ClusterMemberId.FromEndPoint(new IPEndPoint(IPAddress.IPv6Loopback, 3232));
try
{
//define node state
Equal(1, await state.IncrementTermAsync(member));
True(state.IsVotedFor(member));
//define log entries
Equal(1L, await state.AppendAsync(new LogEntryList(entry1, entry2, entry3, entry4, entry5)));
//commit some of them
Equal(2L, await state.CommitAsync(2L));
//save backup
await using var backupStream = new FileStream(backupFile, FileMode.Truncate, FileAccess.Write, FileShare.None, 1024, true);
await state.CreateBackupAsync(backupStream);
}
finally
{
(state as IDisposable)?.Dispose();
}
}

[Fact]
public static async Task Reconstruction()
{
Expand Down
2 changes: 0 additions & 2 deletions src/DotNext/Disposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

namespace DotNext;

using static Runtime.Intrinsics;

/// <summary>
/// Provides implementation of dispose pattern.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<ImplicitUsings>true</ImplicitUsings>
<IsTrimmable>true</IsTrimmable>
<Features>nullablePublicOnly</Features>
<VersionPrefix>5.3.0</VersionPrefix>
<VersionPrefix>5.4.0</VersionPrefix>
<VersionSuffix></VersionSuffix>
<Authors>.NET Foundation and Contributors</Authors>
<Product>.NEXT Family of Libraries</Product>
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<Nullable>enable</Nullable>
<IsTrimmable>true</IsTrimmable>
<Features>nullablePublicOnly</Features>
<VersionPrefix>5.3.0</VersionPrefix>
<VersionPrefix>5.4.0</VersionPrefix>
<VersionSuffix></VersionSuffix>
<Authors>.NET Foundation and Contributors</Authors>
<Product>.NEXT Family of Libraries</Product>
Expand Down
4 changes: 4 additions & 0 deletions src/cluster/DotNext.Net.Cluster/ExceptionMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ internal static string UnknownRaftMessageType<T>(T messageType)
internal static string PersistentStateBroken => (string)Resources.Get();

internal static string ConcurrentMembershipUpdate => (string)Resources.Get();

internal static string LogEntryPayloadTooLarge => (string)Resources.Get();

internal static string SparseFileNotSupported => (string)Resources.Get();
}
4 changes: 3 additions & 1 deletion src/cluster/DotNext.Net.Cluster/ExceptionMessages.restext
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ MissingMessageName=Message must have associated type and name
LeaderIsUnavailable=Leader node is not yet elected
UnknownRaftMessageType=Unknown Raft message type {0}
PersistentStateBroken=Internal state of the WAL didn't pass the integrity check
ConcurrentMembershipUpdate=Cluster membership cannot be modified concurrently
ConcurrentMembershipUpdate=Cluster membership cannot be modified concurrently
LogEntryPayloadTooLarge=The size of the log entry is larger than specified threshold
SparseFileNotSupported=TAR archive doesn't support sparse files on the target platform
Loading

0 comments on commit 5947622

Please sign in to comment.