Skip to content

Commit

Permalink
Removed allocation of async state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jan 23, 2024
1 parent 768b6c2 commit f01f4c6
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 28 deletions.
144 changes: 144 additions & 0 deletions src/DotNext.IO/IO/FileReader.Utils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading.Tasks.Sources;

namespace DotNext.IO;

using Intrinsics = Runtime.Intrinsics;

public partial class FileReader : IDynamicInterfaceCastable
{
private readonly Action readCallback, readDirectCallback;
private ManualResetValueTaskSourceCore<int> source;
private ConfiguredValueTaskAwaitable<int>.ConfiguredValueTaskAwaiter awaiter;
private int extraCount;

private int GetAsyncResult(short token)
{
try
{
return source.GetResult(token);
}
finally
{
source.Reset();
}
}

private void OnRead()
{
var awaiter = this.awaiter;
this.awaiter = default;

int count;
try
{
count = awaiter.GetResult();

bufferEnd += count;
}
catch (Exception e)
{
source.SetException(e);
return;
}

source.SetResult(count);
}

private void OnReadDirect()
{
var awaiter = this.awaiter;
this.awaiter = default;

var extraCount = this.extraCount;
this.extraCount = 0;

int count;
try
{
count = awaiter.GetResult();

fileOffset += count;
count += extraCount;
}
catch (Exception e)
{
source.SetException(e);
return;
}

source.SetResult(count);
}

private ValueTask<int> SubmitAsInt32(ValueTask<int> task, Action callback)
{
awaiter = task.ConfigureAwait(false).GetAwaiter();
if (awaiter.IsCompleted)
{
callback();
}
else
{
awaiter.UnsafeOnCompleted(callback);
}

return new((IValueTaskSource<int>)this, source.Version);
}

private ValueTask<bool> SubmitAsBoolean(ValueTask<int> task, Action callback)
{
awaiter = task.ConfigureAwait(false).GetAwaiter();
if (awaiter.IsCompleted)
{
callback();
}
else
{
awaiter.UnsafeOnCompleted(callback);
}

return new((IValueTaskSource<bool>)this, source.Version);
}

[DynamicInterfaceCastableImplementation]
private interface IProxyValueTaskSource : IValueTaskSource<int>, IValueTaskSource<bool>
{
ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token)
=> Unsafe.As<FileReader>(this).source.GetStatus(token);

ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token)
=> Unsafe.As<FileReader>(this).source.GetStatus(token);

int IValueTaskSource<int>.GetResult(short token)
=> Unsafe.As<FileReader>(this).GetAsyncResult(token);

bool IValueTaskSource<bool>.GetResult(short token)
=> Unsafe.As<FileReader>(this).GetAsyncResult(token) is not 0;

void IValueTaskSource<int>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
=> Unsafe.As<FileReader>(this).source.OnCompleted(continuation, state, token, flags);

void IValueTaskSource<bool>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
=> Unsafe.As<FileReader>(this).source.OnCompleted(continuation, state, token, flags);
}

[ExcludeFromCodeCoverage]
bool IDynamicInterfaceCastable.IsInterfaceImplemented(RuntimeTypeHandle interfaceType, bool throwIfNotImplemented)
{
if (interfaceType.IsOneOf([Intrinsics.TypeOf<IValueTaskSource<int>>(), Intrinsics.TypeOf<IValueTaskSource<bool>>()]))
return true;

return throwIfNotImplemented ? throw new InvalidCastException() : false;
}

[ExcludeFromCodeCoverage]
RuntimeTypeHandle IDynamicInterfaceCastable.GetInterfaceImplementation(RuntimeTypeHandle interfaceType)
{
if (interfaceType.IsOneOf([Intrinsics.TypeOf<IValueTaskSource<int>>(), Intrinsics.TypeOf<IValueTaskSource<bool>>()]))
return Intrinsics.TypeOf<IProxyValueTaskSource>();

throw new InvalidCastException();
}
}
47 changes: 19 additions & 28 deletions src/DotNext.IO/IO/FileReader.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using SafeFileHandle = Microsoft.Win32.SafeHandles.SafeFileHandle;

namespace DotNext.IO;
Expand Down Expand Up @@ -50,6 +49,9 @@ public FileReader(SafeFileHandle handle, long fileOffset = 0L, int bufferSize =
this.handle = handle;
this.fileOffset = fileOffset;
this.allocator = allocator;

readCallback = OnRead;
readDirectCallback = OnReadDirect;
}

/// <summary>
Expand Down Expand Up @@ -198,17 +200,17 @@ public bool TryConsume(int bytes, out ReadOnlyMemory<byte> buffer)
/// <exception cref="ObjectDisposedException">The reader has been disposed.</exception>
/// <exception cref="InternalBufferOverflowException">Internal buffer has no free space.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
public async ValueTask<bool> ReadAsync(CancellationToken token = default)
public ValueTask<bool> ReadAsync(CancellationToken token = default)
{
ObjectDisposedException.ThrowIf(IsDisposed, this);
if (IsDisposed)
return new(GetDisposedTask<bool>());

var buffer = this.buffer.Memory;

switch (bufferStart)
{
case 0 when bufferEnd == buffer.Length:
throw new InternalBufferOverflowException();
return ValueTask.FromException<bool>(new InternalBufferOverflowException());
case > 0:
// compact buffer
buffer.Slice(bufferStart, BufferLength).CopyTo(buffer);
Expand All @@ -217,9 +219,7 @@ public async ValueTask<bool> ReadAsync(CancellationToken token = default)
break;
}

var count = await RandomAccess.ReadAsync(handle, buffer.Slice(bufferEnd), fileOffset + bufferEnd, token).ConfigureAwait(false);
bufferEnd += count;
return count > 0;
return SubmitAsBoolean(RandomAccess.ReadAsync(handle, buffer.Slice(bufferEnd), fileOffset + bufferEnd, token), readCallback);
}

/// <summary>
Expand Down Expand Up @@ -270,35 +270,26 @@ public ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken toke
}

if (destination.IsEmpty)
{
return ValueTask.FromResult(0);
}

if (!HasBufferedData)
{
return ReadDirectAsync(destination, token);
}

BufferSpan.CopyTo(destination.Span, out var writtenCount);
ConsumeUnsafe(writtenCount);
destination = destination.Slice(writtenCount);
BufferSpan.CopyTo(destination.Span, out extraCount);
ConsumeUnsafe(extraCount);
destination = destination.Slice(extraCount);

return destination.IsEmpty
? ValueTask.FromResult(writtenCount)
: ReadDirectAsync(writtenCount, destination, token);
}

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
private async ValueTask<int> ReadDirectAsync(Memory<byte> output, CancellationToken token)
{
var count = await RandomAccess.ReadAsync(handle, output, fileOffset, token).ConfigureAwait(false);
fileOffset += count;
return count;
? ValueTask.FromResult(extraCount)
: ReadDirectAsync(destination, token);
}

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
private async ValueTask<int> ReadDirectAsync(int extraCount, Memory<byte> output, CancellationToken token)
{
var count = await RandomAccess.ReadAsync(handle, output, fileOffset, token).ConfigureAwait(false);
fileOffset += count;
return count + extraCount;
}
private ValueTask<int> ReadDirectAsync(Memory<byte> output, CancellationToken token)
=> SubmitAsInt32(RandomAccess.ReadAsync(handle, output, fileOffset, token), readDirectCallback);

/// <summary>
/// Reads the block of the memory.
Expand Down

0 comments on commit f01f4c6

Please sign in to comment.