Skip to content

Commit

Permalink
[net]refactor: refactor BlockStream (#693)
Browse files Browse the repository at this point in the history
  • Loading branch information
DennisInSky authored Nov 29, 2024
1 parent 2b033a4 commit 433f764
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 60 deletions.
25 changes: 17 additions & 8 deletions net/src/Sails.Remoting/Core/BlockStreamEventListener.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Sails.Remoting.Abstractions.Core;
using Substrate.Gear.Api.Generated;
using Substrate.Gear.Api.Generated.Model.gear_core.message.user;
using Substrate.Gear.Api.Generated.Model.gprimitives;
using Substrate.Gear.Client;
using Substrate.Gear.Client.GearApi.Model.gprimitives;
using Substrate.Gear.Client.NetApi.Model.Types.Base;

namespace Sails.Remoting.Core;

internal sealed class BlockStreamEventListener : EventListener<(ActorId Source, byte[] Bytes)>
{
private readonly SubstrateClientExt nodeClient;
private readonly BlocksStream blocksStream;

internal BlockStreamEventListener(SubstrateClientExt nodeClient, BlocksStream blocksStream)
internal BlockStreamEventListener(BlocksStream blocksStream)
{
this.nodeClient = nodeClient;
EnsureArg.IsNotNull(blocksStream, nameof(blocksStream));

this.blocksStream = blocksStream;
}

public override IAsyncEnumerable<(ActorId Source, byte[] Bytes)> ReadAllAsync(CancellationToken cancellationToken)
=> this.blocksStream.ReadAllHeadersAsync(cancellationToken)
.SelectGearEvents(this.nodeClient, cancellationToken)
.SelectServiceEvents();
=> this.blocksStream.ReadAllGearRuntimeEventsAsync(cancellationToken)
.SelectIfMatches(
GearEvent.UserMessageSent,
(UserMessageSentEventData data) => (UserMessage)data.Value[0])
.Where(userMessage => userMessage.Destination
.IsEqualTo(ActorIdExtensions.Zero))
.Select(userMessage => (userMessage.Source, userMessage.Payload.Value.Value.Select(@byte => @byte.Value).ToArray()));

protected override ValueTask DisposeCoreAsync() => this.blocksStream.DisposeAsync();
protected override ValueTask DisposeCoreAsync()
=> this.blocksStream.DisposeAsync();
}
7 changes: 1 addition & 6 deletions net/src/Sails.Remoting/Core/RemotingReplyViaNodeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public static async Task<RemotingReplyViaNodeClient<T>> FromExecutionAsync(
var messageQueuedEventData = await executeExtrinsic(nodeClient).ConfigureAwait(false);

var result = new RemotingReplyViaNodeClient<T>(
nodeClient,
blocksStream,
extractResult,
messageQueuedEventData);
Expand All @@ -48,19 +47,16 @@ public static async Task<RemotingReplyViaNodeClient<T>> FromExecutionAsync(
}

private RemotingReplyViaNodeClient(
SubstrateClientExt nodeClient,
BlocksStream blocksStream,
Func<MessageQueuedEventData, UserMessage, T> extractResult,
MessageQueuedEventData queuedMessageData)
{
this.nodeClient = nodeClient;
this.blocksStream = blocksStream;
this.extractResult = extractResult;
this.queuedMessageData = queuedMessageData;
this.replyMessage = null;
}

private readonly SubstrateClientExt nodeClient;
private BlocksStream? blocksStream;
private readonly Func<MessageQueuedEventData, UserMessage, T> extractResult;
private readonly MessageQueuedEventData queuedMessageData;
Expand All @@ -85,8 +81,7 @@ public override async Task<T> ReadAsync(CancellationToken cancellationToken)
{
Ensure.Any.IsNotNull(this.blocksStream, nameof(this.blocksStream));

this.replyMessage = await this.blocksStream.ReadAllHeadersAsync(cancellationToken)
.SelectGearEvents(this.nodeClient, cancellationToken)
this.replyMessage = await this.blocksStream.ReadAllGearRuntimeEventsAsync(cancellationToken)
.SelectIfMatches(
GearEvent.UserMessageSent,
(UserMessageSentEventData data) => (UserMessage)data.Value[0])
Expand Down
2 changes: 1 addition & 1 deletion net/src/Sails.Remoting/Core/RemotingViaNodeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public async Task<byte[]> QueryAsync(
var nodeClient = await this.nodeClientProvider.GetNodeClientAsync(cancellationToken).ConfigureAwait(false);
var blocksStream = await nodeClient.GetNewBlocksStreamAsync(cancellationToken).ConfigureAwait(false);

return new BlockStreamEventListener(nodeClient, blocksStream);
return new BlockStreamEventListener(blocksStream);
}

private static MessageQueuedEventData SelectMessageQueuedEventData(IEnumerable<BaseEnumRust<RuntimeEvent>> runtimeEvents)
Expand Down
33 changes: 28 additions & 5 deletions net/src/Substrate.Gear.Client/BlocksStream.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using EnsureThat;
using Substrate.Gear.Api.Generated;
using Substrate.NetApi;
using Substrate.NetApi.Model.Rpc;

Expand All @@ -13,7 +15,7 @@ namespace Substrate.Gear.Client;
public sealed class BlocksStream : IAsyncDisposable
{
internal static async Task<BlocksStream> CreateAsync(
SubstrateClient nodeClient,
SubstrateClientExt nodeClient,
Func<SubstrateClient, Action<string, Header>, Task<string>> subscribe,
Func<SubstrateClient, string, Task> unsubscribe)
{
Expand All @@ -33,24 +35,27 @@ internal static async Task<BlocksStream> CreateAsync(
.ConfigureAwait(false);

return new BlocksStream(
nodeClient,
channel,
() => unsubscribe(nodeClient, subscriptionId));
nodeClient => unsubscribe(nodeClient, subscriptionId));
}

private BlocksStream(Channel<Header> channel, Func<Task> unsubscribe)
private BlocksStream(SubstrateClientExt nodeClient, Channel<Header> channel, Func<SubstrateClient, Task> unsubscribe)
{
this.nodeClient = nodeClient;
this.channel = channel;
this.unsubscribe = unsubscribe;
this.isReadInProgress = 0;
}

private readonly SubstrateClientExt nodeClient;
private readonly Channel<Header> channel;
private readonly Func<Task> unsubscribe;
private readonly Func<SubstrateClient, Task> unsubscribe;
private int isReadInProgress;

public async ValueTask DisposeAsync()
{
await this.unsubscribe().ConfigureAwait(false);
await this.unsubscribe(this.nodeClient).ConfigureAwait(false);
this.channel.Writer.Complete();

GC.SuppressFinalize(this);
Expand Down Expand Up @@ -83,4 +88,22 @@ async IAsyncEnumerable<Header> ReadAllImpl([EnumeratorCancellation] Cancellation
}
}
}

/// <summary>
/// Reads all block headers and applies the provided selector to each header.
/// Only one read operation is allowed at a time.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="selectAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public IAsyncEnumerable<T> ReadAllAsync<T>(
Func<SubstrateClientExt, Header, ValueTask<T>> selectAsync,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(selectAsync, nameof(selectAsync));

return this.ReadAllHeadersAsync(cancellationToken)
.SelectAwait(header => selectAsync(this.nodeClient, header));
}
}
59 changes: 25 additions & 34 deletions net/src/Substrate.Gear.Client/BlocksStreamExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,49 +1,40 @@
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using Substrate.Gear.Api.Generated;
using Substrate.Gear.Api.Generated.Model.gear_core.message.user;
using Substrate.Gear.Api.Generated.Model.gprimitives;
using EnsureThat;
using Substrate.Gear.Api.Generated.Model.vara_runtime;
using Substrate.Gear.Client.NetApi.Model.Rpc;
using Substrate.Gear.Client.NetApi.Model.Types.Base;
using Substrate.NetApi.Model.Rpc;
using Substrate.NetApi.Model.Types.Base;

namespace Substrate.Gear.Client;

public static class BlocksStreamExtensions
{
[SuppressMessage(
"Style",
"VSTHRD200:Use \"Async\" suffix for async methods",
Justification = "To be consistent with system provided extensions")]
public static IAsyncEnumerable<BaseEnumRust<GearEvent>> SelectGearEvents(
this IAsyncEnumerable<Header> headers,
SubstrateClientExt nodeClient,
CancellationToken cancellationToken = default)
=> headers
.SelectAwait(async blockHeader => await nodeClient
.ListBlockEventsAsync(blockHeader.GetBlockHash(), cancellationToken).ConfigureAwait(false))
.SelectMany(eventRecords => eventRecords.ToAsyncEnumerable())
.Select(eventRecord => eventRecord.Event.ToBaseEnumRust())
.SelectIfMatches(
RuntimeEvent.Gear,
(EnumGearEvent gearEvent) => gearEvent.ToBaseEnumRust()
);
public static IAsyncEnumerable<BaseEnumRust<RuntimeEvent>> ReadAllRuntimeEventsAsync(
this BlocksStream blocksStream,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(blocksStream, nameof(blocksStream));

[SuppressMessage(
"Style",
"VSTHRD200:Use \"Async\" suffix for async methods",
Justification = "To be consistent with system provided extensions")]
public static IAsyncEnumerable<(ActorId Source, byte[] Payload)> SelectServiceEvents(
this IAsyncEnumerable<BaseEnumRust<GearEvent>> gearEvents)
=> gearEvents
var eventRecords = blocksStream
.ReadAllAsync(
selectAsync: async (nodeClient, blockHeader)
=> await nodeClient.ListBlockEventsAsync(
blockHeader.GetBlockHash(),
cancellationToken)
.ConfigureAwait(false),
cancellationToken);
return eventRecords.SelectMany(eventRecords => eventRecords.ToAsyncEnumerable())
.Select(eventRecord => eventRecord.Event.ToBaseEnumRust());
}

public static IAsyncEnumerable<BaseEnumRust<GearEvent>> ReadAllGearRuntimeEventsAsync(
this BlocksStream blocksStream,
CancellationToken cancellationToken)
=> blocksStream
.ReadAllRuntimeEventsAsync(cancellationToken)
.SelectIfMatches(
GearEvent.UserMessageSent,
(UserMessageSentEventData data) => (UserMessage)data.Value[0])
.Where(userMessage => userMessage.Destination
.IsEqualTo(GearApi.Model.gprimitives.ActorIdExtensions.Zero))
.Select(userMessage => (userMessage.Source, userMessage.Payload.Value.Value.Select(@byte => @byte.Value).ToArray()));
RuntimeEvent.Gear,
(EnumGearEvent gearEvent) => gearEvent.ToBaseEnumRust());
}
3 changes: 0 additions & 3 deletions net/src/Substrate.Gear.Client/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,4 @@
global using GasUnit = Substrate.NetApi.Model.Types.Primitive.U64;
global using GearEvent = Substrate.Gear.Api.Generated.Model.pallet_gear.pallet.Event;
global using SystemEvent = Substrate.Gear.Api.Generated.Model.frame_system.pallet.Event;
global using UserMessageSentEventData = Substrate.NetApi.Model.Types.Base.BaseTuple<
Substrate.Gear.Api.Generated.Model.gear_core.message.user.UserMessage,
Substrate.NetApi.Model.Types.Base.BaseOpt<Substrate.NetApi.Model.Types.Primitive.U32>>;
global using ValueUnit = Substrate.NetApi.Model.Types.Primitive.U128;
6 changes: 3 additions & 3 deletions net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public static async Task<EventRecord[]> ListBlockEventsAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task<BlocksStream> GetAllBlocksStreamAsync(
this SubstrateClient nodeClient,
this SubstrateClientExt nodeClient,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));
Expand All @@ -269,7 +269,7 @@ public static Task<BlocksStream> GetAllBlocksStreamAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task<BlocksStream> GetNewBlocksStreamAsync(
this SubstrateClient nodeClient,
this SubstrateClientExt nodeClient,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));
Expand All @@ -289,7 +289,7 @@ public static Task<BlocksStream> GetNewBlocksStreamAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task<BlocksStream> GetFinalizedBlocksStreamAsync(
this SubstrateClient nodeClient,
this SubstrateClientExt nodeClient,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));
Expand Down

0 comments on commit 433f764

Please sign in to comment.