diff --git a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs index 4e4e4aeff965c..1fa5ac8659cd5 100644 --- a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs +++ b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs @@ -49,6 +49,7 @@ protected ChannelReader() { } public virtual System.Threading.Tasks.Task Completion { get { throw null; } } public virtual int Count { get { throw null; } } public virtual System.Threading.Tasks.ValueTask ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Collections.Generic.IAsyncEnumerable ReadAllAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual bool TryPeek([System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute(false)] out T item) { throw null; } public abstract bool TryRead([System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute(false)] out T item); public abstract System.Threading.Tasks.ValueTask WaitToReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); diff --git a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj index 0e9946994f581..fae9b57404976 100644 --- a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj +++ b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj @@ -14,5 +14,6 @@ + \ No newline at end of file diff --git a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs index 617aff6b1bfec..c591bf9888138 100644 --- a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs +++ b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs @@ -14,8 +14,4 @@ public partial class ChannelClosedException : System.InvalidOperationException #endif protected ChannelClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { } } - public abstract partial class ChannelReader - { - public virtual System.Collections.Generic.IAsyncEnumerable ReadAllAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - } } diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj index c57527fb77542..ac68cda0be662 100644 --- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj +++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -26,8 +26,6 @@ System.Threading.Channel<T> Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'netstandard2.1'))" /> - @@ -40,7 +38,7 @@ System.Threading.Channel<T> + Link="Common\System\Collections\Concurrent\IProducerConsumerQueue.cs" /> + diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.cs index ee184dd13bb31..b4d5639855c2e 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.cs @@ -1,7 +1,9 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; using System.Threading.Tasks; namespace System.Threading.Channels @@ -90,5 +92,23 @@ async ValueTask ReadAsyncCore(CancellationToken ct) } } } + + /// Creates an that enables reading all of the data from the channel. + /// The to use to cancel the enumeration. + /// + /// Each call that returns true will read the next item out of the channel. + /// will return false once no more data is or will ever be available to read. + /// + /// The created async enumerable. + public virtual async IAsyncEnumerable ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + { + while (TryRead(out T? item)) + { + yield return item; + } + } + } } } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.netcoreapp.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.netcoreapp.cs deleted file mode 100644 index bbfda5cd03dee..0000000000000 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.netcoreapp.cs +++ /dev/null @@ -1,29 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.Collections.Generic; -using System.Runtime.CompilerServices; - -namespace System.Threading.Channels -{ - public abstract partial class ChannelReader - { - /// Creates an that enables reading all of the data from the channel. - /// The to use to cancel the enumeration. - /// - /// Each call that returns true will read the next item out of the channel. - /// will return false once no more data is or will ever be available to read. - /// - /// The created async enumerable. - public virtual async IAsyncEnumerable ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) - { - while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false)) - { - while (TryRead(out T? item)) - { - yield return item; - } - } - } - } -} diff --git a/src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs b/src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs index 25c23c9515c70..cefa3fe6f2990 100644 --- a/src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs +++ b/src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs @@ -784,6 +784,272 @@ public async Task ReadAsync_ConsecutiveReadsSucceed() } } + [Fact] + public void ReadAllAsync_NotIdempotent() + { + Channel c = CreateChannel(); + IAsyncEnumerable e = c.Reader.ReadAllAsync(); + Assert.NotNull(e); + Assert.NotSame(e, c.Reader.ReadAllAsync()); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task ReadAllAsync_UseMoveNextAsyncAfterCompleted_ReturnsFalse(bool completeWhilePending) + { + Channel c = CreateChannel(); + IAsyncEnumerator e = c.Reader.ReadAllAsync().GetAsyncEnumerator(); + + ValueTask vt; + if (completeWhilePending) + { + c.Writer.Complete(); + vt = e.MoveNextAsync(); + Assert.True(vt.IsCompletedSuccessfully); + Assert.False(vt.Result); + } + else + { + vt = e.MoveNextAsync(); + Assert.False(vt.IsCompleted); + c.Writer.Complete(); + Assert.False(await vt); + } + + vt = e.MoveNextAsync(); + Assert.True(vt.IsCompletedSuccessfully); + Assert.False(vt.Result); + } + + [Fact] + public void ReadAllAsync_AvailableDataCompletesSynchronously() + { + Channel c = CreateChannel(); + + IAsyncEnumerator e = c.Reader.ReadAllAsync().GetAsyncEnumerator(); + try + { + for (int i = 100; i < 110; i++) + { + Assert.True(c.Writer.TryWrite(i)); + ValueTask vt = e.MoveNextAsync(); + Assert.True(vt.IsCompletedSuccessfully); + Assert.True(vt.Result); + Assert.Equal(i, e.Current); + } + } + finally + { + ValueTask vt = e.DisposeAsync(); + Assert.True(vt.IsCompletedSuccessfully); + vt.GetAwaiter().GetResult(); + } + } + + [Fact] + public async Task ReadAllAsync_UnavailableDataCompletesAsynchronously() + { + Channel c = CreateChannel(); + + IAsyncEnumerator e = c.Reader.ReadAllAsync().GetAsyncEnumerator(); + try + { + for (int i = 100; i < 110; i++) + { + ValueTask vt = e.MoveNextAsync(); + Assert.False(vt.IsCompleted); + Task producer = Task.Run(() => c.Writer.TryWrite(i)); + Assert.True(await vt); + await producer; + Assert.Equal(i, e.Current); + } + } + finally + { + ValueTask vt = e.DisposeAsync(); + Assert.True(vt.IsCompletedSuccessfully); + vt.GetAwaiter().GetResult(); + } + } + + [Theory] + [InlineData(0)] + [InlineData(1)] + [InlineData(128)] + public async Task ReadAllAsync_ProducerConsumer_ConsumesAllData(int items) + { + Channel c = CreateChannel(); + + int producedTotal = 0, consumedTotal = 0; + await Task.WhenAll( + Task.Run(async () => + { + for (int i = 0; i < items; i++) + { + await c.Writer.WriteAsync(i); + producedTotal += i; + } + c.Writer.Complete(); + }), + Task.Run(async () => + { + IAsyncEnumerator e = c.Reader.ReadAllAsync().GetAsyncEnumerator(); + try + { + while (await e.MoveNextAsync()) + { + consumedTotal += e.Current; + } + } + finally + { + await e.DisposeAsync(); + } + })); + + Assert.Equal(producedTotal, consumedTotal); + } + + [Fact] + public async Task ReadAllAsync_MultipleEnumerationsToEnd() + { + Channel c = CreateChannel(); + + Assert.True(c.Writer.TryWrite(42)); + c.Writer.Complete(); + + IAsyncEnumerable enumerable = c.Reader.ReadAllAsync(); + IAsyncEnumerator e = enumerable.GetAsyncEnumerator(); + + Assert.True(await e.MoveNextAsync()); + Assert.Equal(42, e.Current); + + Assert.False(await e.MoveNextAsync()); + Assert.False(await e.MoveNextAsync()); + + await e.DisposeAsync(); + + e = enumerable.GetAsyncEnumerator(); + Assert.Same(enumerable, e); + + Assert.False(await e.MoveNextAsync()); + Assert.False(await e.MoveNextAsync()); + } + + [Theory] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(true, true)] + public void ReadAllAsync_MultipleSingleElementEnumerations_AllItemsEnumerated(bool sameEnumerable, bool dispose) + { + Channel c = CreateChannel(); + IAsyncEnumerable enumerable = c.Reader.ReadAllAsync(); + + for (int i = 0; i < 10; i++) + { + Assert.True(c.Writer.TryWrite(i)); + IAsyncEnumerator e = (sameEnumerable ? enumerable : c.Reader.ReadAllAsync()).GetAsyncEnumerator(); + ValueTask vt = e.MoveNextAsync(); + Assert.True(vt.IsCompletedSuccessfully); + Assert.True(vt.Result); + Assert.Equal(i, e.Current); + if (dispose) + { + ValueTask dvt = e.DisposeAsync(); + Assert.True(dvt.IsCompletedSuccessfully); + dvt.GetAwaiter().GetResult(); + } + } + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task ReadAllAsync_DualConcurrentEnumeration_AllItemsEnumerated(bool sameEnumerable) + { + if (RequiresSingleReader) + { + return; + } + + Channel c = CreateChannel(); + + IAsyncEnumerable enumerable = c.Reader.ReadAllAsync(); + + IAsyncEnumerator e1 = enumerable.GetAsyncEnumerator(); + IAsyncEnumerator e2 = (sameEnumerable ? enumerable : c.Reader.ReadAllAsync()).GetAsyncEnumerator(); + Assert.NotSame(e1, e2); + + ValueTask vt1, vt2; + int producerTotal = 0, consumerTotal = 0; + for (int i = 0; i < 10; i++) + { + vt1 = e1.MoveNextAsync(); + vt2 = e2.MoveNextAsync(); + + await c.Writer.WriteAsync(i); + producerTotal += i; + await c.Writer.WriteAsync(i * 2); + producerTotal += i * 2; + + Assert.True(await vt1); + Assert.True(await vt2); + consumerTotal += e1.Current; + consumerTotal += e2.Current; + } + + vt1 = e1.MoveNextAsync(); + vt2 = e2.MoveNextAsync(); + c.Writer.Complete(); + Assert.False(await vt1); + Assert.False(await vt2); + + Assert.Equal(producerTotal, consumerTotal); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task ReadAllAsync_CanceledBeforeMoveNextAsync_Throws(bool dataAvailable) + { + Channel c = CreateChannel(); + if (dataAvailable) + { + Assert.True(c.Writer.TryWrite(42)); + } + + var cts = new CancellationTokenSource(); + cts.Cancel(); + + IAsyncEnumerator e = c.Reader.ReadAllAsync(cts.Token).GetAsyncEnumerator(); + ValueTask vt = e.MoveNextAsync(); + Assert.True(vt.IsCompleted); + Assert.False(vt.IsCompletedSuccessfully); + OperationCanceledException oce = await Assert.ThrowsAnyAsync(async () => await vt); + Assert.Equal(cts.Token, oce.CancellationToken); + } + + [Fact] + public async Task ReadAllAsync_CanceledAfterMoveNextAsync_Throws() + { + Channel c = CreateChannel(); + var cts = new CancellationTokenSource(); + + IAsyncEnumerator e = c.Reader.ReadAllAsync(cts.Token).GetAsyncEnumerator(); + ValueTask vt = e.MoveNextAsync(); + Assert.False(vt.IsCompleted); + + cts.Cancel(); + OperationCanceledException oce = await Assert.ThrowsAnyAsync(async () => await vt); + Assert.Equal(cts.Token, oce.CancellationToken); + + vt = e.MoveNextAsync(); + Assert.True(vt.IsCompletedSuccessfully); + Assert.False(vt.Result); + } + [Fact] public async Task WaitToReadAsync_ConsecutiveReadsSucceed() { diff --git a/src/libraries/System.Threading.Channels/tests/ChannelTestBase.netcoreapp.cs b/src/libraries/System.Threading.Channels/tests/ChannelTestBase.netcoreapp.cs deleted file mode 100644 index c7046a6323b13..0000000000000 --- a/src/libraries/System.Threading.Channels/tests/ChannelTestBase.netcoreapp.cs +++ /dev/null @@ -1,281 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Runtime.CompilerServices; -using System.Threading.Tasks; -using Xunit; - -namespace System.Threading.Channels.Tests -{ - public abstract partial class ChannelTestBase : TestBase - { - [Fact] - public void ReadAllAsync_NotIdempotent() - { - Channel c = CreateChannel(); - IAsyncEnumerable e = c.Reader.ReadAllAsync(); - Assert.NotNull(e); - Assert.NotSame(e, c.Reader.ReadAllAsync()); - } - - [Theory] - [InlineData(false)] - [InlineData(true)] - public async Task ReadAllAsync_UseMoveNextAsyncAfterCompleted_ReturnsFalse(bool completeWhilePending) - { - Channel c = CreateChannel(); - IAsyncEnumerator e = c.Reader.ReadAllAsync().GetAsyncEnumerator(); - - ValueTask vt; - if (completeWhilePending) - { - c.Writer.Complete(); - vt = e.MoveNextAsync(); - Assert.True(vt.IsCompletedSuccessfully); - Assert.False(vt.Result); - } - else - { - vt = e.MoveNextAsync(); - Assert.False(vt.IsCompleted); - c.Writer.Complete(); - Assert.False(await vt); - } - - vt = e.MoveNextAsync(); - Assert.True(vt.IsCompletedSuccessfully); - Assert.False(vt.Result); - } - - [Fact] - public void ReadAllAsync_AvailableDataCompletesSynchronously() - { - Channel c = CreateChannel(); - - IAsyncEnumerator e = c.Reader.ReadAllAsync().GetAsyncEnumerator(); - try - { - for (int i = 100; i < 110; i++) - { - Assert.True(c.Writer.TryWrite(i)); - ValueTask vt = e.MoveNextAsync(); - Assert.True(vt.IsCompletedSuccessfully); - Assert.True(vt.Result); - Assert.Equal(i, e.Current); - } - } - finally - { - ValueTask vt = e.DisposeAsync(); - Assert.True(vt.IsCompletedSuccessfully); - vt.GetAwaiter().GetResult(); - } - } - - [Fact] - public async Task ReadAllAsync_UnavailableDataCompletesAsynchronously() - { - Channel c = CreateChannel(); - - IAsyncEnumerator e = c.Reader.ReadAllAsync().GetAsyncEnumerator(); - try - { - for (int i = 100; i < 110; i++) - { - ValueTask vt = e.MoveNextAsync(); - Assert.False(vt.IsCompleted); - Task producer = Task.Run(() => c.Writer.TryWrite(i)); - Assert.True(await vt); - await producer; - Assert.Equal(i, e.Current); - } - } - finally - { - ValueTask vt = e.DisposeAsync(); - Assert.True(vt.IsCompletedSuccessfully); - vt.GetAwaiter().GetResult(); - } - } - - [Theory] - [InlineData(0)] - [InlineData(1)] - [InlineData(128)] - public async Task ReadAllAsync_ProducerConsumer_ConsumesAllData(int items) - { - Channel c = CreateChannel(); - - int producedTotal = 0, consumedTotal = 0; - await Task.WhenAll( - Task.Run(async () => - { - for (int i = 0; i < items; i++) - { - await c.Writer.WriteAsync(i); - producedTotal += i; - } - c.Writer.Complete(); - }), - Task.Run(async () => - { - IAsyncEnumerator e = c.Reader.ReadAllAsync().GetAsyncEnumerator(); - try - { - while (await e.MoveNextAsync()) - { - consumedTotal += e.Current; - } - } - finally - { - await e.DisposeAsync(); - } - })); - - Assert.Equal(producedTotal, consumedTotal); - } - - [Fact] - public async Task ReadAllAsync_MultipleEnumerationsToEnd() - { - Channel c = CreateChannel(); - - Assert.True(c.Writer.TryWrite(42)); - c.Writer.Complete(); - - IAsyncEnumerable enumerable = c.Reader.ReadAllAsync(); - IAsyncEnumerator e = enumerable.GetAsyncEnumerator(); - - Assert.True(await e.MoveNextAsync()); - Assert.Equal(42, e.Current); - - Assert.False(await e.MoveNextAsync()); - Assert.False(await e.MoveNextAsync()); - - await e.DisposeAsync(); - - e = enumerable.GetAsyncEnumerator(); - Assert.Same(enumerable, e); - - Assert.False(await e.MoveNextAsync()); - Assert.False(await e.MoveNextAsync()); - } - - [Theory] - [InlineData(false, false)] - [InlineData(false, true)] - [InlineData(true, false)] - [InlineData(true, true)] - public void ReadAllAsync_MultipleSingleElementEnumerations_AllItemsEnumerated(bool sameEnumerable, bool dispose) - { - Channel c = CreateChannel(); - IAsyncEnumerable enumerable = c.Reader.ReadAllAsync(); - - for (int i = 0; i < 10; i++) - { - Assert.True(c.Writer.TryWrite(i)); - IAsyncEnumerator e = (sameEnumerable ? enumerable : c.Reader.ReadAllAsync()).GetAsyncEnumerator(); - ValueTask vt = e.MoveNextAsync(); - Assert.True(vt.IsCompletedSuccessfully); - Assert.True(vt.Result); - Assert.Equal(i, e.Current); - if (dispose) - { - ValueTask dvt = e.DisposeAsync(); - Assert.True(dvt.IsCompletedSuccessfully); - dvt.GetAwaiter().GetResult(); - } - } - } - - [Theory] - [InlineData(false)] - [InlineData(true)] - public async Task ReadAllAsync_DualConcurrentEnumeration_AllItemsEnumerated(bool sameEnumerable) - { - if (RequiresSingleReader) - { - return; - } - - Channel c = CreateChannel(); - - IAsyncEnumerable enumerable = c.Reader.ReadAllAsync(); - - IAsyncEnumerator e1 = enumerable.GetAsyncEnumerator(); - IAsyncEnumerator e2 = (sameEnumerable ? enumerable : c.Reader.ReadAllAsync()).GetAsyncEnumerator(); - Assert.NotSame(e1, e2); - - ValueTask vt1, vt2; - int producerTotal = 0, consumerTotal = 0; - for (int i = 0; i < 10; i++) - { - vt1 = e1.MoveNextAsync(); - vt2 = e2.MoveNextAsync(); - - await c.Writer.WriteAsync(i); - producerTotal += i; - await c.Writer.WriteAsync(i * 2); - producerTotal += i * 2; - - Assert.True(await vt1); - Assert.True(await vt2); - consumerTotal += e1.Current; - consumerTotal += e2.Current; - } - - vt1 = e1.MoveNextAsync(); - vt2 = e2.MoveNextAsync(); - c.Writer.Complete(); - Assert.False(await vt1); - Assert.False(await vt2); - - Assert.Equal(producerTotal, consumerTotal); - } - - [Theory] - [InlineData(false)] - [InlineData(true)] - public async Task ReadAllAsync_CanceledBeforeMoveNextAsync_Throws(bool dataAvailable) - { - Channel c = CreateChannel(); - if (dataAvailable) - { - Assert.True(c.Writer.TryWrite(42)); - } - - var cts = new CancellationTokenSource(); - cts.Cancel(); - - IAsyncEnumerator e = c.Reader.ReadAllAsync(cts.Token).GetAsyncEnumerator(); - ValueTask vt = e.MoveNextAsync(); - Assert.True(vt.IsCompleted); - Assert.False(vt.IsCompletedSuccessfully); - OperationCanceledException oce = await Assert.ThrowsAnyAsync(async () => await vt); - Assert.Equal(cts.Token, oce.CancellationToken); - } - - [Fact] - public async Task ReadAllAsync_CanceledAfterMoveNextAsync_Throws() - { - Channel c = CreateChannel(); - var cts = new CancellationTokenSource(); - - IAsyncEnumerator e = c.Reader.ReadAllAsync(cts.Token).GetAsyncEnumerator(); - ValueTask vt = e.MoveNextAsync(); - Assert.False(vt.IsCompleted); - - cts.Cancel(); - OperationCanceledException oce = await Assert.ThrowsAnyAsync(async () => await vt); - Assert.Equal(cts.Token, oce.CancellationToken); - - vt = e.MoveNextAsync(); - Assert.True(vt.IsCompletedSuccessfully); - Assert.False(vt.Result); - } - } -} diff --git a/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj b/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj index ded390e98331a..7d929a96eade6 100644 --- a/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj +++ b/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj @@ -6,18 +6,14 @@ + - - - - - - - + + +