Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[net]refactor: refactor BlockStream #693

Merged
merged 21 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions net/src/Sails.Remoting/Core/BlockStreamEventListener.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Sails.Remoting.Abstractions.Core;
using Substrate.Gear.Api.Generated;
using Substrate.Gear.Api.Generated.Model.gear_core.message.user;
using Substrate.Gear.Api.Generated.Model.gprimitives;
using Substrate.Gear.Client;
using Substrate.Gear.Client.GearApi.Model.gprimitives;
using Substrate.Gear.Client.NetApi.Model.Types.Base;

namespace Sails.Remoting.Core;

internal sealed class BlockStreamEventListener : EventListener<(ActorId Source, byte[] Bytes)>
{
private readonly SubstrateClientExt nodeClient;
private readonly BlocksStream blocksStream;

internal BlockStreamEventListener(SubstrateClientExt nodeClient, BlocksStream blocksStream)
internal BlockStreamEventListener(BlocksStream blocksStream)
{
this.nodeClient = nodeClient;
EnsureArg.IsNotNull(blocksStream, nameof(blocksStream));

this.blocksStream = blocksStream;
}

public override IAsyncEnumerable<(ActorId Source, byte[] Bytes)> ReadAllAsync(CancellationToken cancellationToken)
=> this.blocksStream.ReadAllHeadersAsync(cancellationToken)
.SelectGearEvents(this.nodeClient, cancellationToken)
.SelectServiceEvents();
=> this.blocksStream.ReadAllGearRuntimeEventsAsync(cancellationToken)
.SelectIfMatches(
GearEvent.UserMessageSent,
(UserMessageSentEventData data) => (UserMessage)data.Value[0])
.Where(userMessage => userMessage.Destination
.IsEqualTo(ActorIdExtensions.Zero))
.Select(userMessage => (userMessage.Source, userMessage.Payload.Value.Value.Select(@byte => @byte.Value).ToArray()));

protected override ValueTask DisposeCoreAsync() => this.blocksStream.DisposeAsync();
protected override ValueTask DisposeCoreAsync()
=> this.blocksStream.DisposeAsync();
}
7 changes: 1 addition & 6 deletions net/src/Sails.Remoting/Core/RemotingReplyViaNodeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public static async Task<RemotingReplyViaNodeClient<T>> FromExecutionAsync(
var messageQueuedEventData = await executeExtrinsic(nodeClient).ConfigureAwait(false);

var result = new RemotingReplyViaNodeClient<T>(
nodeClient,
blocksStream,
extractResult,
messageQueuedEventData);
Expand All @@ -48,19 +47,16 @@ public static async Task<RemotingReplyViaNodeClient<T>> FromExecutionAsync(
}

private RemotingReplyViaNodeClient(
SubstrateClientExt nodeClient,
BlocksStream blocksStream,
Func<MessageQueuedEventData, UserMessage, T> extractResult,
MessageQueuedEventData queuedMessageData)
{
this.nodeClient = nodeClient;
this.blocksStream = blocksStream;
this.extractResult = extractResult;
this.queuedMessageData = queuedMessageData;
this.replyMessage = null;
}

private readonly SubstrateClientExt nodeClient;
private BlocksStream? blocksStream;
private readonly Func<MessageQueuedEventData, UserMessage, T> extractResult;
private readonly MessageQueuedEventData queuedMessageData;
Expand All @@ -85,8 +81,7 @@ public override async Task<T> ReadAsync(CancellationToken cancellationToken)
{
Ensure.Any.IsNotNull(this.blocksStream, nameof(this.blocksStream));

this.replyMessage = await this.blocksStream.ReadAllHeadersAsync(cancellationToken)
.SelectGearEvents(this.nodeClient, cancellationToken)
this.replyMessage = await this.blocksStream.ReadAllGearRuntimeEventsAsync(cancellationToken)
.SelectIfMatches(
GearEvent.UserMessageSent,
(UserMessageSentEventData data) => (UserMessage)data.Value[0])
Expand Down
2 changes: 1 addition & 1 deletion net/src/Sails.Remoting/Core/RemotingViaNodeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public async Task<byte[]> QueryAsync(
var nodeClient = await this.nodeClientProvider.GetNodeClientAsync(cancellationToken).ConfigureAwait(false);
var blocksStream = await nodeClient.GetNewBlocksStreamAsync(cancellationToken).ConfigureAwait(false);

return new BlockStreamEventListener(nodeClient, blocksStream);
return new BlockStreamEventListener(blocksStream);
}

private static MessageQueuedEventData SelectMessageQueuedEventData(IEnumerable<BaseEnumRust<RuntimeEvent>> runtimeEvents)
Expand Down
33 changes: 28 additions & 5 deletions net/src/Substrate.Gear.Client/BlocksStream.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using EnsureThat;
using Substrate.Gear.Api.Generated;
using Substrate.NetApi;
using Substrate.NetApi.Model.Rpc;

Expand All @@ -13,7 +15,7 @@ namespace Substrate.Gear.Client;
public sealed class BlocksStream : IAsyncDisposable
{
internal static async Task<BlocksStream> CreateAsync(
SubstrateClient nodeClient,
SubstrateClientExt nodeClient,
Func<SubstrateClient, Action<string, Header>, Task<string>> subscribe,
Func<SubstrateClient, string, Task> unsubscribe)
{
Expand All @@ -33,24 +35,27 @@ internal static async Task<BlocksStream> CreateAsync(
.ConfigureAwait(false);

return new BlocksStream(
nodeClient,
channel,
() => unsubscribe(nodeClient, subscriptionId));
nodeClient => unsubscribe(nodeClient, subscriptionId));
}

private BlocksStream(Channel<Header> channel, Func<Task> unsubscribe)
private BlocksStream(SubstrateClientExt nodeClient, Channel<Header> channel, Func<SubstrateClient, Task> unsubscribe)
{
this.nodeClient = nodeClient;
this.channel = channel;
this.unsubscribe = unsubscribe;
this.isReadInProgress = 0;
}

private readonly SubstrateClientExt nodeClient;
private readonly Channel<Header> channel;
private readonly Func<Task> unsubscribe;
private readonly Func<SubstrateClient, Task> unsubscribe;
private int isReadInProgress;

public async ValueTask DisposeAsync()
{
await this.unsubscribe().ConfigureAwait(false);
await this.unsubscribe(this.nodeClient).ConfigureAwait(false);
this.channel.Writer.Complete();

GC.SuppressFinalize(this);
Expand Down Expand Up @@ -83,4 +88,22 @@ async IAsyncEnumerable<Header> ReadAllImpl([EnumeratorCancellation] Cancellation
}
}
}

/// <summary>
/// Reads all block headers and applies the provided selector to each header.
/// Only one read operation is allowed at a time.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="selectAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public IAsyncEnumerable<T> ReadAllAsync<T>(
Func<SubstrateClientExt, Header, ValueTask<T>> selectAsync,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(selectAsync, nameof(selectAsync));

return this.ReadAllHeadersAsync(cancellationToken)
.SelectAwait(header => selectAsync(this.nodeClient, header));
}
}
59 changes: 25 additions & 34 deletions net/src/Substrate.Gear.Client/BlocksStreamExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,49 +1,40 @@
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using Substrate.Gear.Api.Generated;
using Substrate.Gear.Api.Generated.Model.gear_core.message.user;
using Substrate.Gear.Api.Generated.Model.gprimitives;
using EnsureThat;
using Substrate.Gear.Api.Generated.Model.vara_runtime;
using Substrate.Gear.Client.NetApi.Model.Rpc;
using Substrate.Gear.Client.NetApi.Model.Types.Base;
using Substrate.NetApi.Model.Rpc;
using Substrate.NetApi.Model.Types.Base;

namespace Substrate.Gear.Client;

public static class BlocksStreamExtensions
{
[SuppressMessage(
"Style",
"VSTHRD200:Use \"Async\" suffix for async methods",
Justification = "To be consistent with system provided extensions")]
public static IAsyncEnumerable<BaseEnumRust<GearEvent>> SelectGearEvents(
this IAsyncEnumerable<Header> headers,
SubstrateClientExt nodeClient,
CancellationToken cancellationToken = default)
=> headers
.SelectAwait(async blockHeader => await nodeClient
.ListBlockEventsAsync(blockHeader.GetBlockHash(), cancellationToken).ConfigureAwait(false))
.SelectMany(eventRecords => eventRecords.ToAsyncEnumerable())
.Select(eventRecord => eventRecord.Event.ToBaseEnumRust())
.SelectIfMatches(
RuntimeEvent.Gear,
(EnumGearEvent gearEvent) => gearEvent.ToBaseEnumRust()
);
public static IAsyncEnumerable<BaseEnumRust<RuntimeEvent>> ReadAllRuntimeEventsAsync(
this BlocksStream blocksStream,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(blocksStream, nameof(blocksStream));

[SuppressMessage(
"Style",
"VSTHRD200:Use \"Async\" suffix for async methods",
Justification = "To be consistent with system provided extensions")]
public static IAsyncEnumerable<(ActorId Source, byte[] Payload)> SelectServiceEvents(
this IAsyncEnumerable<BaseEnumRust<GearEvent>> gearEvents)
=> gearEvents
var eventRecords = blocksStream
.ReadAllAsync(
selectAsync: async (nodeClient, blockHeader)
=> await nodeClient.ListBlockEventsAsync(
blockHeader.GetBlockHash(),
cancellationToken)
.ConfigureAwait(false),
cancellationToken);
return eventRecords.SelectMany(eventRecords => eventRecords.ToAsyncEnumerable())
.Select(eventRecord => eventRecord.Event.ToBaseEnumRust());
}

public static IAsyncEnumerable<BaseEnumRust<GearEvent>> ReadAllGearRuntimeEventsAsync(
this BlocksStream blocksStream,
CancellationToken cancellationToken)
=> blocksStream
.ReadAllRuntimeEventsAsync(cancellationToken)
.SelectIfMatches(
GearEvent.UserMessageSent,
(UserMessageSentEventData data) => (UserMessage)data.Value[0])
.Where(userMessage => userMessage.Destination
.IsEqualTo(GearApi.Model.gprimitives.ActorIdExtensions.Zero))
.Select(userMessage => (userMessage.Source, userMessage.Payload.Value.Value.Select(@byte => @byte.Value).ToArray()));
RuntimeEvent.Gear,
(EnumGearEvent gearEvent) => gearEvent.ToBaseEnumRust());
}
3 changes: 0 additions & 3 deletions net/src/Substrate.Gear.Client/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,4 @@
global using GasUnit = Substrate.NetApi.Model.Types.Primitive.U64;
global using GearEvent = Substrate.Gear.Api.Generated.Model.pallet_gear.pallet.Event;
global using SystemEvent = Substrate.Gear.Api.Generated.Model.frame_system.pallet.Event;
global using UserMessageSentEventData = Substrate.NetApi.Model.Types.Base.BaseTuple<
Substrate.Gear.Api.Generated.Model.gear_core.message.user.UserMessage,
Substrate.NetApi.Model.Types.Base.BaseOpt<Substrate.NetApi.Model.Types.Primitive.U32>>;
global using ValueUnit = Substrate.NetApi.Model.Types.Primitive.U128;
6 changes: 3 additions & 3 deletions net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public static async Task<EventRecord[]> ListBlockEventsAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task<BlocksStream> GetAllBlocksStreamAsync(
this SubstrateClient nodeClient,
this SubstrateClientExt nodeClient,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));
Expand All @@ -269,7 +269,7 @@ public static Task<BlocksStream> GetAllBlocksStreamAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task<BlocksStream> GetNewBlocksStreamAsync(
this SubstrateClient nodeClient,
this SubstrateClientExt nodeClient,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));
Expand All @@ -289,7 +289,7 @@ public static Task<BlocksStream> GetNewBlocksStreamAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task<BlocksStream> GetFinalizedBlocksStreamAsync(
this SubstrateClient nodeClient,
this SubstrateClientExt nodeClient,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));
Expand Down