Skip to content

Commit

Permalink
Added thread-local version of shared stream
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Mar 20, 2024
1 parent fc95c6d commit c1e9b57
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 24 deletions.
10 changes: 6 additions & 4 deletions src/DotNext.Tests/IO/StreamSourceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,14 @@ static ValueTask WriteToBuffer(ReadOnlyMemory<byte> block, ArrayBufferWriter<byt
Equal(content, writer.WrittenMemory.ToArray());
}

[Fact]
public static async Task SharedStreamConcurrentReadAsync()
[Theory]
[InlineData(false)]
[InlineData(true)]
public static async Task SharedStreamConcurrentReadAsync(bool compatWithAsync)
{
byte[] expected = [10, 20, 30, 40, 50, 60];
byte[] expected = RandomBytes(512);

await using var stream = StreamSource.AsSharedStream(new(expected));
await using var stream = StreamSource.AsSharedStream(new(expected), compatWithAsync);

var task1 = ReadStreamAsync(stream);
var task2 = ReadStreamAsync(stream);
Expand Down
72 changes: 55 additions & 17 deletions src/DotNext/IO/SharedReadOnlyMemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,24 @@ namespace DotNext.IO;

using static Buffers.Memory;

internal sealed class SharedReadOnlyMemoryStream(ReadOnlySequence<byte> sequence) : ReadOnlyStream
internal abstract class SharedReadOnlyMemoryStream(ReadOnlySequence<byte> sequence) : ReadOnlyStream
{
// don't use BoxedValue due to limitations of AsyncLocal
private readonly AsyncLocal<StrongBox<SequencePosition>> position = new();

[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private SequencePosition LocalPosition
private protected abstract SequencePosition LocalPosition
{
get => position.Value?.Value ?? sequence.Start;
set => (position.Value ??= new()).Value = value;
get;
set;
}

private protected SequencePosition StartPosition => sequence.Start;

private ReadOnlySequence<byte> GetRemainingSequence(out SequencePosition start)
=> sequence.Slice(start = LocalPosition);

public override bool CanSeek => true;
public sealed override bool CanSeek => true;

public override long Length => sequence.Length;
public sealed override long Length => sequence.Length;

public override long Position
public sealed override long Position
{
get => sequence.GetOffset(LocalPosition);
set
Expand All @@ -36,7 +34,7 @@ public override long Position
}
}

public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken token)
public sealed override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken token)
{
ValidateCopyToArguments(destination, bufferSize);

Expand All @@ -46,7 +44,7 @@ public override async Task CopyToAsync(Stream destination, int bufferSize, Cance
LocalPosition = sequence.End;
}

public override void CopyTo(Stream destination, int bufferSize)
public sealed override void CopyTo(Stream destination, int bufferSize)
{
ValidateCopyToArguments(destination, bufferSize);

Expand All @@ -56,16 +54,16 @@ public override void CopyTo(Stream destination, int bufferSize)
LocalPosition = sequence.End;
}

public override void SetLength(long value) => throw new NotSupportedException();
public sealed override void SetLength(long value) => throw new NotSupportedException();

public override int Read(Span<byte> buffer)
public sealed override int Read(Span<byte> buffer)
{
GetRemainingSequence(out var startPos).CopyTo(buffer, out var writtenCount);
LocalPosition = sequence.GetPosition(writtenCount, startPos);
return writtenCount;
}

public override long Seek(long offset, SeekOrigin origin)
public sealed override long Seek(long offset, SeekOrigin origin)
{
var newPosition = origin switch
{
Expand All @@ -84,5 +82,45 @@ public override long Seek(long offset, SeekOrigin origin)
return newPosition;
}

public override string ToString() => sequence.ToString();
public sealed override string ToString() => sequence.ToString();

internal static SharedReadOnlyMemoryStream CreateAsyncLocalStream(ReadOnlySequence<byte> sequence)
=> new AsyncLocalStream(sequence);

internal static SharedReadOnlyMemoryStream CreateThreadLocalStream(ReadOnlySequence<byte> sequence)
=> new ThreadLocalStream(sequence);
}

file sealed class AsyncLocalStream(ReadOnlySequence<byte> sequence) : SharedReadOnlyMemoryStream(sequence)
{
// don't use BoxedValue due to limitations of AsyncLocal
private readonly AsyncLocal<StrongBox<SequencePosition>> position = new();

[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private protected override SequencePosition LocalPosition
{
get => position.Value?.Value ?? StartPosition;
set => (position.Value ??= new()).Value = value;
}
}

file sealed class ThreadLocalStream(ReadOnlySequence<byte> sequence) : SharedReadOnlyMemoryStream(sequence)
{
private readonly ThreadLocal<SequencePosition> position = new(Func.Constant(sequence.Start), trackAllValues: false);

private protected override SequencePosition LocalPosition
{
get => position.Value;
set => position.Value = value;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
position.Dispose();
}

base.Dispose(disposing);
}
}
14 changes: 11 additions & 3 deletions src/DotNext/IO/StreamSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,24 @@ public static Stream AsStream(this ReadOnlyMemory<byte> memory)
=> AsStream(new ReadOnlySequence<byte>(memory));

/// <summary>
/// Gets read-only stream that can be shared across async flows for independent reads.
/// Gets read-only stream that can be shared across async flows or threads for independent reads.
/// </summary>
/// <remarks>
/// You need to set a position explicitly before using stream for each parallel async flow.
/// <see cref="Stream.SetLength(long)"/> is not supported to avoid different views of the same stream.
/// </remarks>
/// <param name="sequence">The sequence of bytes.</param>
/// <param name="compatWithAsync">
/// <see langword="true"/> to create a stream than can be shared across async flows and different threads;
/// <see langword="false"/> to create a stream that is safe to share between different threads only.
/// </param>
/// <returns>The stream over sequence of bytes.</returns>
public static Stream AsSharedStream(this ReadOnlySequence<byte> sequence)
=> sequence.IsEmpty ? Stream.Null : new SharedReadOnlyMemoryStream(sequence);
public static Stream AsSharedStream(this ReadOnlySequence<byte> sequence, bool compatWithAsync = true)
=> sequence.IsEmpty
? Stream.Null
: compatWithAsync
? SharedReadOnlyMemoryStream.CreateAsyncLocalStream(sequence)
: SharedReadOnlyMemoryStream.CreateThreadLocalStream(sequence);

/// <summary>
/// Returns writable synchronous stream.
Expand Down

0 comments on commit c1e9b57

Please sign in to comment.