Skip to content

Commit

Permalink
Threading 5.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jul 1, 2024
1 parent 9b86c3b commit 9cd5360
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 165 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
Release Notes
====

# 07-01-2024
<a href="https://www.nuget.org/packages/dotnext.threading/5.9.0">DotNext.Threading 5.9.0</a>
* Added `WaitAnyAsync` overload method to wait on a group of cancellation tokens that supports interruption

<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.7.2">DotNext.Net.Cluster 5.7.2</a>
* Fixed [244](https://github.com/dotnet/dotNext/issues/244)

<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.7.2">DotNext.AspNetCore.Cluster 5.7.2</a>
* Fixed [244](https://github.com/dotnet/dotNext/issues/244)

# 06-26-2024
<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.7.1">DotNext.Net.Cluster 5.7.1</a>
* Improved reliability of disk I/O for the new WAL binary format
Expand Down
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,21 @@ All these things are implemented in 100% managed code on top of existing .NET AP
* [NuGet Packages](https://www.nuget.org/profiles/rvsakno)

# What's new
Release Date: 06-26-2024
Release Date: 07-01-2024

<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.7.1">DotNext.Net.Cluster 5.7.1</a>
* Improved reliability of disk I/O for the new WAL binary format
<a href="https://www.nuget.org/packages/dotnext.threading/5.9.0">DotNext.Threading 5.9.0</a>
* Added `WaitAnyAsync` overload method to wait on a group of cancellation tokens that supports interruption

<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.7.1">DotNext.AspNetCore.Cluster 5.7.1</a>
* Improved reliability of disk I/O for the new WAL binary format
<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.7.2">DotNext.Net.Cluster 5.7.2</a>
* Fixed [244](https://github.com/dotnet/dotNext/issues/244)

<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.7.2">DotNext.AspNetCore.Cluster 5.7.2</a>
* Fixed [244](https://github.com/dotnet/dotNext/issues/244)

Changelog for previous versions located [here](./CHANGELOG.md).

# Release & Support Policy
The libraries are versioned according with [Semantic Versioning 2.0](https://semver.org/).
The libraries are versioned according to [Semantic Versioning 2.0](https://semver.org/).

| Version | .NET compatibility | Support Level |
| ---- | ---- | ---- |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ protected override SnapshotBuilder CreateSnapshotBuilder(in SnapshotBuilderConte
[InlineData(true)]
public static async Task JsonSerialization(bool cached)
{
var dir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using var state = new JsonPersistentState(dir, cached);
var entry1 = state.CreateJsonLogEntry(new TestJsonObject { StringField = "Entry1" });
var entry2 = state.CreateJsonLogEntry(new TestJsonObject { StringField = "Entry2" });
Expand All @@ -1045,4 +1045,45 @@ public static async Task JsonSerialization(bool cached)
payload = state.Entries[1];
Equal(entry2.Content.StringField.Value, payload.StringField.Value);
}

[Fact]
public static async Task RegressionIssue244()
{
var path = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using (var state = new PersistentStateWithoutSnapshot(path, RecordsPerPartition, new() { UseCaching = true }))
{
var entries = RandomEntries();
foreach (var entry in entries[..2])
{
await state.AppendAsync(entry);
}

await state.CommitAsync();
}

using (var state = new PersistentStateWithoutSnapshot(path, RecordsPerPartition, new() { UseCaching = true }))
{
Func<IReadOnlyList<IRaftLogEntry>, long?, CancellationToken, ValueTask<Missing>> checker = (entries, snapshotIndex, token) =>
{
NotEmpty(entries);
Null(snapshotIndex);
Equal(1L, entries[0].Term);
Equal(1L, entries[1].Term);
return ValueTask.FromResult(Missing.Value);
};

await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1L);
}

static Int64LogEntry[] RandomEntries()
{
var entries = new Int64LogEntry[RecordsPerPartition];
for (var i = 0; i < entries.Length; i++)
{
entries[i] = new Int64LogEntry() { Term = 1L, Content = i + 10L };
}

return entries;
}
}
}
12 changes: 12 additions & 0 deletions src/DotNext.Tests/Threading/AsyncBridgeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,16 @@ public static async Task WaitForCancellationMultipleTokens()
cts1.Cancel();
Equal(cts3.Token, await task);
}

[Fact]
public static async Task Interruption()
{
const string interruptionReason = "Reason";
var task = AsyncBridge.WaitAnyAsync([new CancellationToken(canceled: false)], out var interruption);
False(task.IsCompletedSuccessfully);
True(interruption(interruptionReason));

var e = await ThrowsAsync<PendingTaskInterruptedException>(Func.Constant(task));
Equal(interruptionReason, e.Reason);
}
}
2 changes: 1 addition & 1 deletion src/DotNext.Threading/DotNext.Threading.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<ImplicitUsings>true</ImplicitUsings>
<IsTrimmable>true</IsTrimmable>
<Features>nullablePublicOnly</Features>
<VersionPrefix>5.8.0</VersionPrefix>
<VersionPrefix>5.9.0</VersionPrefix>
<VersionSuffix></VersionSuffix>
<Authors>.NET Foundation and Contributors</Authors>
<Product>.NEXT Family of Libraries</Product>
Expand Down
11 changes: 11 additions & 0 deletions src/DotNext.Threading/Threading/AsyncBridge.CancellationToken.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ private void OnCanceled(CancellationToken token)
}
}

internal bool TryInterrupt(object? reason)
{
bool result;
if (result = TrySetException(new PendingTaskInterruptedException { Reason = reason }))
{
Cleanup();
}

return result;
}

private static void Unregister(ReadOnlySpan<CancellationTokenRegistration> registrations)
{
foreach (ref readonly var registration in registrations)
Expand Down
40 changes: 33 additions & 7 deletions src/DotNext.Threading/Threading/AsyncBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public static ValueTask WaitAsync(this CancellationToken token, bool completeAsC
return result.CreateTask(InfiniteTimeSpan, token);
}

private static CancellationTokenCompletionSource GetCompletionSource(ReadOnlySpan<CancellationToken> tokens) => tokens switch
{
[] => throw new InvalidOperationException(),
[var token] => new CancellationTokenCompletionSource1(token),
[var token1, var token2] => new CancellationTokenCompletionSource2(token1, token2),
_ => new CancellationTokenCompletionSourceN(tokens),
};

/// <summary>
/// Creates a task that will complete when any of the supplied tokens have canceled.
/// </summary>
Expand All @@ -60,19 +68,37 @@ public static Task<CancellationToken> WaitAnyAsync(this ReadOnlySpan<Cancellatio
Task<CancellationToken> result;
try
{
CancellationTokenCompletionSource source = tokens switch
{
[] => throw new InvalidOperationException(),
[var token] => new CancellationTokenCompletionSource1(token),
[var token1, var token2] => new CancellationTokenCompletionSource2(token1, token2),
_ => new CancellationTokenCompletionSourceN(tokens),
};
result = GetCompletionSource(tokens).Task;
}
catch (Exception e)
{
result = Task.FromException<CancellationToken>(e);
}

return result;
}

/// <summary>
/// Creates a task that will complete when any of the supplied tokens have canceled.
/// </summary>
/// <param name="tokens">The tokens to wait on for cancellation.</param>
/// <param name="interruption">An interruption procedure than can be used to turn the returned task into the failed state.</param>
/// <returns>The canceled token.</returns>
/// <exception cref="InvalidOperationException"><paramref name="tokens"/> is empty.</exception>
/// <exception cref="PendingTaskInterruptedException">The returned task is interrupted by <paramref name="interruption"/> procedure.</exception>
public static Task<CancellationToken> WaitAnyAsync(this ReadOnlySpan<CancellationToken> tokens, out Func<object?, bool> interruption)
{
Task<CancellationToken> result;
try
{
var source = GetCompletionSource(tokens);
result = source.Task;
interruption = source.TryInterrupt;
}
catch (Exception e)
{
result = Task.FromException<CancellationToken>(e);
interruption = static _ => false;
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<ImplicitUsings>true</ImplicitUsings>
<IsTrimmable>true</IsTrimmable>
<Features>nullablePublicOnly</Features>
<VersionPrefix>5.7.1</VersionPrefix>
<VersionPrefix>5.7.2</VersionPrefix>
<VersionSuffix></VersionSuffix>
<Authors>.NET Foundation and Contributors</Authors>
<Product>.NEXT Family of Libraries</Product>
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<Nullable>enable</Nullable>
<IsTrimmable>true</IsTrimmable>
<Features>nullablePublicOnly</Features>
<VersionPrefix>5.7.1</VersionPrefix>
<VersionPrefix>5.7.2</VersionPrefix>
<VersionSuffix></VersionSuffix>
<Authors>.NET Foundation and Contributors</Authors>
<Product>.NEXT Family of Libraries</Product>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract partial class DiskBasedStateMachine : PersistentState
/// <param name="configuration">The configuration of the persistent audit trail.</param>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="recordsPerPartition"/> is less than 2.</exception>
protected DiskBasedStateMachine(DirectoryInfo path, int recordsPerPartition, Options? configuration = null)
: base(path, recordsPerPartition, configuration ?? new())
: base(path, recordsPerPartition, configuration ??= new())
{
}

Expand Down Expand Up @@ -64,27 +64,22 @@ protected DiskBasedStateMachine(string path, int recordsPerPartition, Options? c

for (Partition? partition = null; startIndex <= commitIndex; LastAppliedEntryIndex = startIndex++, token.ThrowIfCancellationRequested())
{
if (TryGetPartition(startIndex, ref partition))
if (TryGetPartition(startIndex, ref partition, out var switched))
{
if (switched)
{
await partition.FlushAsync(token).ConfigureAwait(false);
}

var entry = partition.Read(sessionId, startIndex);
var snapshotLength = await ApplyCoreAsync(entry).ConfigureAwait(false);
Volatile.Write(ref lastTerm, entry.Term);
partition.ClearContext(startIndex);

// Remove log entry from the cache according to eviction policy
var completedPartition = startIndex == partition.LastIndex;
if (!entry.IsPersisted)
{
await partition.PersistCachedEntryAsync(startIndex, snapshotLength.HasValue).ConfigureAwait(false);

// Flush partition if we are finished or at the last entry in it
if (startIndex == commitIndex | completedPartition)
{
await partition.FlushAsync(token).ConfigureAwait(false);
}
}
if (bufferManager.IsCachingEnabled)
partition.Evict(startIndex);

if (completedPartition)
if (startIndex == commitIndex || startIndex == partition.LastIndex)
{
partition.ClearContext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ protected MemoryBasedStateMachine(DirectoryInfo path, int recordsPerPartition, O
snapshotBufferSize = configuration.SnapshotBufferSize;

// with concurrent compaction, we will release cached log entries according to partition lifetime
evictOnCommit = compaction is not CompactionMode.Incremental && configuration.CacheEvictionPolicy is LogEntryCacheEvictionPolicy.OnCommit;
snapshot = new(path, snapshotBufferSize, in bufferManager, concurrentReads, configuration.WriteMode, initialSize: configuration.InitialPartitionSize);
evictOnCommit = compaction is not CompactionMode.Incremental
&& configuration.CacheEvictionPolicy is LogEntryCacheEvictionPolicy.OnCommit
&& configuration.UseCaching;
snapshot = new(path, snapshotBufferSize, in bufferManager, concurrentReads, configuration.WriteMode,
initialSize: configuration.InitialPartitionSize);
}

/// <summary>
Expand Down Expand Up @@ -93,7 +96,7 @@ private async ValueTask BuildSnapshotAsync(int sessionId, long upperBoundIndex,
{
// Calculate the term of the snapshot
Partition? current = LastPartition;
builder.Term = TryGetPartition(upperBoundIndex, ref current)
builder.Term = TryGetPartition(upperBoundIndex, ref current, out _)
? current.GetTerm(upperBoundIndex)
: throw new MissingPartitionException(upperBoundIndex);

Expand All @@ -113,7 +116,8 @@ private async ValueTask BuildSnapshotAsync(int sessionId, long upperBoundIndex,
private bool TryGetPartition(SnapshotBuilder builder, long startIndex, long endIndex, ref long currentIndex, [NotNullWhen(true)] ref Partition? partition)
{
builder.AdjustIndex(startIndex, endIndex, ref currentIndex);
return currentIndex.IsBetween(startIndex.Enclosed(), endIndex.Enclosed()) && TryGetPartition(currentIndex, ref partition);
return currentIndex.IsBetween(startIndex.Enclosed(), endIndex.Enclosed())
&& TryGetPartition(currentIndex, ref partition, out _);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -687,27 +691,24 @@ private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationT
var commitIndex = LastCommittedEntryIndex;
for (Partition? partition = null; startIndex <= commitIndex; LastAppliedEntryIndex = startIndex++, token.ThrowIfCancellationRequested())
{
if (TryGetPartition(startIndex, ref partition))
if (TryGetPartition(startIndex, ref partition, out var switched))
{
if (switched)
{
await partition.FlushAsync(token).ConfigureAwait(false);
}

var entry = partition.Read(sessionId, startIndex);
await ApplyCoreAsync(entry).ConfigureAwait(false);
Volatile.Write(ref lastTerm, entry.Term);
partition.ClearContext(startIndex);

// Remove log entry from the cache according to eviction policy
var completedPartition = startIndex == partition.LastIndex;
if (!entry.IsPersisted)
if (evictOnCommit && bufferManager.IsCachingEnabled)
{
await partition.PersistCachedEntryAsync(startIndex, evictOnCommit).ConfigureAwait(false);

// Flush partition if we are finished or at the last entry in it
if (startIndex == commitIndex | completedPartition)
{
await partition.FlushAsync(token).ConfigureAwait(false);
}
partition.Evict(startIndex);
}

if (completedPartition)
if (startIndex == commitIndex || startIndex == partition.LastIndex)
{
partition.ClearContext();
}
Expand Down Expand Up @@ -758,7 +759,7 @@ public async Task ReplayAsync(CancellationToken token = default)
if (compaction is CompactionMode.Incremental)
{
incrementalBuilder = await InitializeLongLivingSnapshotBuilderAsync(session).ConfigureAwait(false);
for (Partition? partition = FirstPartition; TryGetPartition(startIndex, ref partition) && partition is not null && startIndex <= LastCommittedEntryIndex; startIndex++)
for (var partition = FirstPartition; TryGetPartition(startIndex, ref partition, out _) && startIndex <= LastCommittedEntryIndex; startIndex++)
{
entry = partition.Read(session, startIndex);
incrementalBuilder.Builder.Term = entry.Term;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public void Dispose()

internal enum CachedLogEntryPersistenceMode : byte
{
None = 0,
CopyToBuffer,
CopyToBuffer = 0,
SkipBuffer,
}

Expand Down
Loading

0 comments on commit 9cd5360

Please sign in to comment.