From 433f764c0a09e4273205c3b54fcf690fcc529345 Mon Sep 17 00:00:00 2001 From: Dennis Diatlov Date: Fri, 29 Nov 2024 08:48:11 +0000 Subject: [PATCH] [net]refactor: refactor BlockStream (#693) --- .../Core/BlockStreamEventListener.cs | 25 +++++--- .../Core/RemotingReplyViaNodeClient.cs | 7 +-- .../Core/RemotingViaNodeClient.cs | 2 +- net/src/Substrate.Gear.Client/BlocksStream.cs | 33 +++++++++-- .../BlocksStreamExtensions.cs | 59 ++++++++----------- net/src/Substrate.Gear.Client/GlobalUsings.cs | 3 - .../SubstrateClientExtExtensions.cs | 6 +- 7 files changed, 75 insertions(+), 60 deletions(-) diff --git a/net/src/Sails.Remoting/Core/BlockStreamEventListener.cs b/net/src/Sails.Remoting/Core/BlockStreamEventListener.cs index 8fa293bd..41b941fa 100644 --- a/net/src/Sails.Remoting/Core/BlockStreamEventListener.cs +++ b/net/src/Sails.Remoting/Core/BlockStreamEventListener.cs @@ -1,28 +1,37 @@ using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; +using EnsureThat; using Sails.Remoting.Abstractions.Core; -using Substrate.Gear.Api.Generated; +using Substrate.Gear.Api.Generated.Model.gear_core.message.user; using Substrate.Gear.Api.Generated.Model.gprimitives; using Substrate.Gear.Client; +using Substrate.Gear.Client.GearApi.Model.gprimitives; +using Substrate.Gear.Client.NetApi.Model.Types.Base; namespace Sails.Remoting.Core; internal sealed class BlockStreamEventListener : EventListener<(ActorId Source, byte[] Bytes)> { - private readonly SubstrateClientExt nodeClient; private readonly BlocksStream blocksStream; - internal BlockStreamEventListener(SubstrateClientExt nodeClient, BlocksStream blocksStream) + internal BlockStreamEventListener(BlocksStream blocksStream) { - this.nodeClient = nodeClient; + EnsureArg.IsNotNull(blocksStream, nameof(blocksStream)); + this.blocksStream = blocksStream; } public override IAsyncEnumerable<(ActorId Source, byte[] Bytes)> ReadAllAsync(CancellationToken cancellationToken) - => this.blocksStream.ReadAllHeadersAsync(cancellationToken) - .SelectGearEvents(this.nodeClient, cancellationToken) - .SelectServiceEvents(); + => this.blocksStream.ReadAllGearRuntimeEventsAsync(cancellationToken) + .SelectIfMatches( + GearEvent.UserMessageSent, + (UserMessageSentEventData data) => (UserMessage)data.Value[0]) + .Where(userMessage => userMessage.Destination + .IsEqualTo(ActorIdExtensions.Zero)) + .Select(userMessage => (userMessage.Source, userMessage.Payload.Value.Value.Select(@byte => @byte.Value).ToArray())); - protected override ValueTask DisposeCoreAsync() => this.blocksStream.DisposeAsync(); + protected override ValueTask DisposeCoreAsync() + => this.blocksStream.DisposeAsync(); } diff --git a/net/src/Sails.Remoting/Core/RemotingReplyViaNodeClient.cs b/net/src/Sails.Remoting/Core/RemotingReplyViaNodeClient.cs index 47e0ccdd..fff5fd5d 100644 --- a/net/src/Sails.Remoting/Core/RemotingReplyViaNodeClient.cs +++ b/net/src/Sails.Remoting/Core/RemotingReplyViaNodeClient.cs @@ -31,7 +31,6 @@ public static async Task> FromExecutionAsync( var messageQueuedEventData = await executeExtrinsic(nodeClient).ConfigureAwait(false); var result = new RemotingReplyViaNodeClient( - nodeClient, blocksStream, extractResult, messageQueuedEventData); @@ -48,19 +47,16 @@ public static async Task> FromExecutionAsync( } private RemotingReplyViaNodeClient( - SubstrateClientExt nodeClient, BlocksStream blocksStream, Func extractResult, MessageQueuedEventData queuedMessageData) { - this.nodeClient = nodeClient; this.blocksStream = blocksStream; this.extractResult = extractResult; this.queuedMessageData = queuedMessageData; this.replyMessage = null; } - private readonly SubstrateClientExt nodeClient; private BlocksStream? blocksStream; private readonly Func extractResult; private readonly MessageQueuedEventData queuedMessageData; @@ -85,8 +81,7 @@ public override async Task ReadAsync(CancellationToken cancellationToken) { Ensure.Any.IsNotNull(this.blocksStream, nameof(this.blocksStream)); - this.replyMessage = await this.blocksStream.ReadAllHeadersAsync(cancellationToken) - .SelectGearEvents(this.nodeClient, cancellationToken) + this.replyMessage = await this.blocksStream.ReadAllGearRuntimeEventsAsync(cancellationToken) .SelectIfMatches( GearEvent.UserMessageSent, (UserMessageSentEventData data) => (UserMessage)data.Value[0]) diff --git a/net/src/Sails.Remoting/Core/RemotingViaNodeClient.cs b/net/src/Sails.Remoting/Core/RemotingViaNodeClient.cs index 478813d3..4fa52db3 100644 --- a/net/src/Sails.Remoting/Core/RemotingViaNodeClient.cs +++ b/net/src/Sails.Remoting/Core/RemotingViaNodeClient.cs @@ -175,7 +175,7 @@ public async Task QueryAsync( var nodeClient = await this.nodeClientProvider.GetNodeClientAsync(cancellationToken).ConfigureAwait(false); var blocksStream = await nodeClient.GetNewBlocksStreamAsync(cancellationToken).ConfigureAwait(false); - return new BlockStreamEventListener(nodeClient, blocksStream); + return new BlockStreamEventListener(blocksStream); } private static MessageQueuedEventData SelectMessageQueuedEventData(IEnumerable> runtimeEvents) diff --git a/net/src/Substrate.Gear.Client/BlocksStream.cs b/net/src/Substrate.Gear.Client/BlocksStream.cs index c5bb9e89..da9f32fd 100644 --- a/net/src/Substrate.Gear.Client/BlocksStream.cs +++ b/net/src/Substrate.Gear.Client/BlocksStream.cs @@ -1,10 +1,12 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using EnsureThat; +using Substrate.Gear.Api.Generated; using Substrate.NetApi; using Substrate.NetApi.Model.Rpc; @@ -13,7 +15,7 @@ namespace Substrate.Gear.Client; public sealed class BlocksStream : IAsyncDisposable { internal static async Task CreateAsync( - SubstrateClient nodeClient, + SubstrateClientExt nodeClient, Func, Task> subscribe, Func unsubscribe) { @@ -33,24 +35,27 @@ internal static async Task CreateAsync( .ConfigureAwait(false); return new BlocksStream( + nodeClient, channel, - () => unsubscribe(nodeClient, subscriptionId)); + nodeClient => unsubscribe(nodeClient, subscriptionId)); } - private BlocksStream(Channel
channel, Func unsubscribe) + private BlocksStream(SubstrateClientExt nodeClient, Channel
channel, Func unsubscribe) { + this.nodeClient = nodeClient; this.channel = channel; this.unsubscribe = unsubscribe; this.isReadInProgress = 0; } + private readonly SubstrateClientExt nodeClient; private readonly Channel
channel; - private readonly Func unsubscribe; + private readonly Func unsubscribe; private int isReadInProgress; public async ValueTask DisposeAsync() { - await this.unsubscribe().ConfigureAwait(false); + await this.unsubscribe(this.nodeClient).ConfigureAwait(false); this.channel.Writer.Complete(); GC.SuppressFinalize(this); @@ -83,4 +88,22 @@ async IAsyncEnumerable
ReadAllImpl([EnumeratorCancellation] Cancellation } } } + + /// + /// Reads all block headers and applies the provided selector to each header. + /// Only one read operation is allowed at a time. + /// + /// + /// + /// + /// + public IAsyncEnumerable ReadAllAsync( + Func> selectAsync, + CancellationToken cancellationToken) + { + EnsureArg.IsNotNull(selectAsync, nameof(selectAsync)); + + return this.ReadAllHeadersAsync(cancellationToken) + .SelectAwait(header => selectAsync(this.nodeClient, header)); + } } diff --git a/net/src/Substrate.Gear.Client/BlocksStreamExtensions.cs b/net/src/Substrate.Gear.Client/BlocksStreamExtensions.cs index 11b6ffb4..e0a53458 100644 --- a/net/src/Substrate.Gear.Client/BlocksStreamExtensions.cs +++ b/net/src/Substrate.Gear.Client/BlocksStreamExtensions.cs @@ -1,49 +1,40 @@ using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Threading; -using Substrate.Gear.Api.Generated; -using Substrate.Gear.Api.Generated.Model.gear_core.message.user; -using Substrate.Gear.Api.Generated.Model.gprimitives; +using EnsureThat; using Substrate.Gear.Api.Generated.Model.vara_runtime; using Substrate.Gear.Client.NetApi.Model.Rpc; using Substrate.Gear.Client.NetApi.Model.Types.Base; -using Substrate.NetApi.Model.Rpc; using Substrate.NetApi.Model.Types.Base; namespace Substrate.Gear.Client; public static class BlocksStreamExtensions { - [SuppressMessage( - "Style", - "VSTHRD200:Use \"Async\" suffix for async methods", - Justification = "To be consistent with system provided extensions")] - public static IAsyncEnumerable> SelectGearEvents( - this IAsyncEnumerable
headers, - SubstrateClientExt nodeClient, - CancellationToken cancellationToken = default) - => headers - .SelectAwait(async blockHeader => await nodeClient - .ListBlockEventsAsync(blockHeader.GetBlockHash(), cancellationToken).ConfigureAwait(false)) - .SelectMany(eventRecords => eventRecords.ToAsyncEnumerable()) - .Select(eventRecord => eventRecord.Event.ToBaseEnumRust()) - .SelectIfMatches( - RuntimeEvent.Gear, - (EnumGearEvent gearEvent) => gearEvent.ToBaseEnumRust() - ); + public static IAsyncEnumerable> ReadAllRuntimeEventsAsync( + this BlocksStream blocksStream, + CancellationToken cancellationToken) + { + EnsureArg.IsNotNull(blocksStream, nameof(blocksStream)); - [SuppressMessage( - "Style", - "VSTHRD200:Use \"Async\" suffix for async methods", - Justification = "To be consistent with system provided extensions")] - public static IAsyncEnumerable<(ActorId Source, byte[] Payload)> SelectServiceEvents( - this IAsyncEnumerable> gearEvents) - => gearEvents + var eventRecords = blocksStream + .ReadAllAsync( + selectAsync: async (nodeClient, blockHeader) + => await nodeClient.ListBlockEventsAsync( + blockHeader.GetBlockHash(), + cancellationToken) + .ConfigureAwait(false), + cancellationToken); + return eventRecords.SelectMany(eventRecords => eventRecords.ToAsyncEnumerable()) + .Select(eventRecord => eventRecord.Event.ToBaseEnumRust()); + } + + public static IAsyncEnumerable> ReadAllGearRuntimeEventsAsync( + this BlocksStream blocksStream, + CancellationToken cancellationToken) + => blocksStream + .ReadAllRuntimeEventsAsync(cancellationToken) .SelectIfMatches( - GearEvent.UserMessageSent, - (UserMessageSentEventData data) => (UserMessage)data.Value[0]) - .Where(userMessage => userMessage.Destination - .IsEqualTo(GearApi.Model.gprimitives.ActorIdExtensions.Zero)) - .Select(userMessage => (userMessage.Source, userMessage.Payload.Value.Value.Select(@byte => @byte.Value).ToArray())); + RuntimeEvent.Gear, + (EnumGearEvent gearEvent) => gearEvent.ToBaseEnumRust()); } diff --git a/net/src/Substrate.Gear.Client/GlobalUsings.cs b/net/src/Substrate.Gear.Client/GlobalUsings.cs index a29aa50d..be6ab16c 100644 --- a/net/src/Substrate.Gear.Client/GlobalUsings.cs +++ b/net/src/Substrate.Gear.Client/GlobalUsings.cs @@ -10,7 +10,4 @@ global using GasUnit = Substrate.NetApi.Model.Types.Primitive.U64; global using GearEvent = Substrate.Gear.Api.Generated.Model.pallet_gear.pallet.Event; global using SystemEvent = Substrate.Gear.Api.Generated.Model.frame_system.pallet.Event; -global using UserMessageSentEventData = Substrate.NetApi.Model.Types.Base.BaseTuple< - Substrate.Gear.Api.Generated.Model.gear_core.message.user.UserMessage, - Substrate.NetApi.Model.Types.Base.BaseOpt>; global using ValueUnit = Substrate.NetApi.Model.Types.Primitive.U128; diff --git a/net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs b/net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs index 827cf92d..8f7f3273 100644 --- a/net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs +++ b/net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs @@ -249,7 +249,7 @@ public static async Task ListBlockEventsAsync( /// /// public static Task GetAllBlocksStreamAsync( - this SubstrateClient nodeClient, + this SubstrateClientExt nodeClient, CancellationToken cancellationToken) { EnsureArg.IsNotNull(nodeClient, nameof(nodeClient)); @@ -269,7 +269,7 @@ public static Task GetAllBlocksStreamAsync( /// /// public static Task GetNewBlocksStreamAsync( - this SubstrateClient nodeClient, + this SubstrateClientExt nodeClient, CancellationToken cancellationToken) { EnsureArg.IsNotNull(nodeClient, nameof(nodeClient)); @@ -289,7 +289,7 @@ public static Task GetNewBlocksStreamAsync( /// /// public static Task GetFinalizedBlocksStreamAsync( - this SubstrateClient nodeClient, + this SubstrateClientExt nodeClient, CancellationToken cancellationToken) { EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));