diff --git a/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/ActorKeys.cs b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/ActorKeys.cs new file mode 100644 index 0000000..2ba89a7 --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/ActorKeys.cs @@ -0,0 +1,8 @@ +namespace ClusterClientSample.FrontEnd.Actors; + +// This class will only be used as a Type marker to retrieve the `ClusterClient` actor from the `ActorRegistry`. +// It is not meant to be instantiated in any way. +public sealed class GatewayClusterClientActor +{ + private GatewayClusterClientActor(){ } +} \ No newline at end of file diff --git a/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/BatchedWorkRequester.cs b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/BatchedWorkRequester.cs new file mode 100644 index 0000000..cd19539 --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/BatchedWorkRequester.cs @@ -0,0 +1,48 @@ +using Akka.Actor; +using Akka.Cluster.Tools.Client; +using Akka.Event; +using Akka.Hosting; +using ClusterClientSample.Shared; + +namespace ClusterClientSample.FrontEnd.Actors; + +/// <summary> +/// This actor will: +/// * periodically send a `BatchedWork` request message to the backend app "/user/worker-manager" +/// service via the `ClusterClient` actor +/// * receive the `Result` message for each work performed and logs them +/// </summary> +public class BatchedWorkRequester: ReceiveActor, IWithTimers +{ + private const string BatchKey = nameof(BatchKey); + + private readonly Random _random = new(); + + public BatchedWorkRequester(IRequiredActor<GatewayClusterClientActor> clusterClientActor) + { + var log = Context.GetLogger(); + var clusterClient = clusterClientActor.ActorRef; + + Receive<BatchedWork>(msg => + { + log.Info("Requesting a batched work to the other cluster. Count: {0}", msg.Size); + clusterClient.Tell(new ClusterClient.Send("/user/worker-manager", msg, true)); + Timers.StartSingleTimer(BatchKey, GetBatch(), TimeSpan.FromSeconds(10)); + }); + + Receive<Result>(msg => + { + log.Info("[ID:{0}] Work result: {1}", msg.Id, msg.Value); + }); + } + + protected override void PreStart() + { + base.PreStart(); + Timers.StartSingleTimer(BatchKey, GetBatch(), TimeSpan.FromSeconds(3)); + } + + public ITimerScheduler Timers { get; set; } = null!; + + private BatchedWork GetBatch() => new (_random.Next(5) + 5); +} \ No newline at end of file diff --git a/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/WorkReportCollector.cs b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/WorkReportCollector.cs new file mode 100644 index 0000000..a41db35 --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/WorkReportCollector.cs @@ -0,0 +1,51 @@ +using Akka.Actor; +using Akka.Cluster.Tools.Client; +using Akka.Event; +using Akka.Hosting; +using ClusterClientSample.Shared; + +namespace ClusterClientSample.FrontEnd.Actors; + +/// <summary> +/// This actor will: +/// * periodically publish a `SendReport` message to the backend app "report" pub-sub topic via the `ClusterClient` actor +/// * receive the `Report` message from the workload metric actor and logs them +/// </summary> +public class WorkReportCollector: ReceiveActor, IWithTimers +{ + private class GetReport + { + public static readonly GetReport Instance = new(); + private GetReport() { } + } + + private const string ReportKey = nameof(ReportKey); + + public WorkReportCollector(IRequiredActor<GatewayClusterClientActor> clusterClientActor) + { + var log = Context.GetLogger(); + var clusterClient = clusterClientActor.ActorRef; + + Receive<Report>(report => + { + foreach (var (actor, count) in report.Counts) + { + log.Info("Worker {0} has done {1} works", actor, count); + } + }); + + Receive<GetReport>(_ => + { + log.Info("Requesting work report metrics from the other cluster."); + clusterClient.Tell(new ClusterClient.Publish("report", SendReport.Instance)); + }); + } + + public ITimerScheduler Timers { get; set; } = null!; + + protected override void PreStart() + { + base.PreStart(); + Timers.StartPeriodicTimer(ReportKey, GetReport.Instance, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10)); + } +} \ No newline at end of file diff --git a/src/clustering/cluster-client/ClusterClientSample.FrontEnd/ClusterClientSample.FrontEnd.csproj b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/ClusterClientSample.FrontEnd.csproj new file mode 100644 index 0000000..35e3d5a --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/ClusterClientSample.FrontEnd.csproj @@ -0,0 +1,19 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <OutputType>Exe</OutputType> + <TargetFramework>net8.0</TargetFramework> + <ImplicitUsings>enable</ImplicitUsings> + <Nullable>enable</Nullable> + </PropertyGroup> + + <ItemGroup> + <PackageReference Include="Akka.Cluster.Hosting" /> + <PackageReference Include="Akka.Serialization.Hyperion" /> + </ItemGroup> + + <ItemGroup> + <ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" /> + </ItemGroup> + +</Project> diff --git a/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Program.cs b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Program.cs new file mode 100644 index 0000000..779d5f4 --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.FrontEnd/Program.cs @@ -0,0 +1,43 @@ +using Akka.Cluster.Hosting; +using Akka.Hosting; +using Akka.Remote.Hosting; +using ClusterClientSample.FrontEnd.Actors; +using Microsoft.Extensions.Hosting; + +const string gatewaySystemAddress = "akka.tcp://cluster-system@localhost:12552"; +const string systemName = "remote-cluster-system"; +const string hostName = "localhost"; +const int port = 12553; + +Console.Title = "Frontend Gateway Node"; + +var host = new HostBuilder() + .ConfigureServices((context, services) => + { + services.AddAkka(systemName, builder => + { + builder + // Setup remoting + .WithRemoting(configure: options => + { + options.Port = port; + options.HostName = hostName; + }) + + // Setup `ClusterClient` actor to connect to another actor system + .WithClusterClient<GatewayClusterClientActor>([ $"{gatewaySystemAddress}/system/receptionist" ]) + + // Setup required actors and startup code + .WithActors((system, registry, resolver) => + { + var requesterActor = system.ActorOf(resolver.Props(typeof(BatchedWorkRequester)), "work-batch-requester"); + registry.Register<BatchedWorkRequester>(requesterActor); + + var reportCollectorActor = system.ActorOf(resolver.Props(typeof(WorkReportCollector)), "work-report-collector"); + registry.Register<WorkReportCollector>(reportCollectorActor); + }); + }); + }).Build(); + +await host.StartAsync(); +await host.WaitForShutdownAsync(); \ No newline at end of file diff --git a/src/clustering/cluster-client/ClusterClientSample.Gateway/Actors/MetricCounterActor.cs b/src/clustering/cluster-client/ClusterClientSample.Gateway/Actors/MetricCounterActor.cs new file mode 100644 index 0000000..aa8d33f --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.Gateway/Actors/MetricCounterActor.cs @@ -0,0 +1,23 @@ +using Akka.Actor; +using ClusterClientSample.Shared; + +namespace ClusterClientSample.Gateway.Actors; + +public class MetricCounterActor : ReceiveActor +{ + public MetricCounterActor() + { + var counts = new Dictionary<IActorRef, int>(); + + Receive<WorkComplete>(_ => + { + if (counts.TryGetValue(Sender, out var count)) + counts[Sender] = ++count; + else + counts.Add(Sender, 1); + }); + + Receive<SendReport>(_ => Sender.Tell(new Report( + counts.ToDictionary(kvp => kvp.Key.Path.ToString(), kvp => kvp.Value)))); + } +} \ No newline at end of file diff --git a/src/clustering/cluster-client/ClusterClientSample.Gateway/Actors/WorkerManagerActor.cs b/src/clustering/cluster-client/ClusterClientSample.Gateway/Actors/WorkerManagerActor.cs new file mode 100644 index 0000000..4635f30 --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.Gateway/Actors/WorkerManagerActor.cs @@ -0,0 +1,45 @@ +using Akka.Actor; +using Akka.Cluster.Routing; +using Akka.Event; +using Akka.Hosting; +using Akka.Routing; +using ClusterClientSample.Shared; + +namespace ClusterClientSample.Gateway.Actors; + +public class WorkerManagerActor : ReceiveActor +{ + private static int _nextId = 1; + + public WorkerManagerActor(IRequiredActor<MetricCounterActor> counterActor) + { + var log = Context.GetLogger(); + var counter = counterActor.ActorRef; + var workerRouter = GetWorkerRouter(counter); + + Receive<BatchedWork>(batch => + { + log.Info("Generating a work batch of size {0}", batch.Size); + for (var i = 0; i < batch.Size; i++) + { + // forward the work request as if it was sent by the original sender so that the work result can be + // sent back to the original sender by the worker + workerRouter.Forward(new Work(_nextId++)); + } + }); + } + + private static IActorRef GetWorkerRouter(IActorRef counter) + { + // Creates a cluster router pool of 10 workers for each cluster node with the role "worker" + // that joins the cluster. + // + // The router will use a round-robin strategy to distribute messages amongst the worker actors + var props = new ClusterRouterPool( + local: new RoundRobinPool(10), + settings: new ClusterRouterPoolSettings(30, 10, true, "worker")) + .Props(Props.Create(() => new WorkerActor(counter))); + + return Context.ActorOf(props); + } +} \ No newline at end of file diff --git a/src/clustering/cluster-client/ClusterClientSample.Gateway/ClusterClientSample.Gateway.csproj b/src/clustering/cluster-client/ClusterClientSample.Gateway/ClusterClientSample.Gateway.csproj new file mode 100644 index 0000000..14bae2b --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.Gateway/ClusterClientSample.Gateway.csproj @@ -0,0 +1,19 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <OutputType>Exe</OutputType> + <TargetFramework>net8.0</TargetFramework> + <ImplicitUsings>enable</ImplicitUsings> + <Nullable>enable</Nullable> + </PropertyGroup> + + <ItemGroup> + <PackageReference Include="Akka.Cluster.Hosting" /> + <PackageReference Include="Akka.Serialization.Hyperion" /> + </ItemGroup> + + <ItemGroup> + <ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" /> + </ItemGroup> + +</Project> diff --git a/src/clustering/cluster-client/ClusterClientSample.Gateway/Program.cs b/src/clustering/cluster-client/ClusterClientSample.Gateway/Program.cs new file mode 100644 index 0000000..281b90e --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.Gateway/Program.cs @@ -0,0 +1,78 @@ +using Akka.Actor; +using Akka.Cluster.Hosting; +using Akka.Cluster.Tools.Client; +using Akka.Hosting; +using Akka.Remote.Hosting; +using ClusterClientSample.Gateway.Actors; +using Microsoft.Extensions.Hosting; + +const string systemName = "cluster-system"; +const string hostName = "localhost"; +const string port = "12552"; +const string selfAddress = $"akka.tcp://{systemName}@{hostName}:{port}"; + +// This node acts as a known contact point for all external actor systems to connect via `ClusterClient` by setting up +// the `ClusterClientReceptionist` +Console.Title = "Backend Gateway Node"; + +var host = new HostBuilder() + .ConfigureServices((context, services) => + { + services.AddAkka(systemName, builder => + { + builder + // Setup remoting and clustering + .WithRemoting(configure: options => + { + options.Port = int.Parse(port); + options.HostName = hostName; + }) + .WithClustering(options: new ClusterOptions + { + // Note that we do not assign the role/tag "worker" to this node, no worker actors will be + // deployed in this node. + // + // If we want the gateway to also work double duty as worker node, we can add the "worker" role/tag + // to the Roles array. + Roles = [ "gateway" ], + SeedNodes = [ selfAddress ] + }) + + // Setup `ClusterClientReceptionist` to only deploy on nodes with "gateway" role + .WithClusterClientReceptionist(role: "gateway") + + // Setup required actors and startup code + .WithActors((system, registry, resolver) => + { + // The name of this actor ("worker-manager") is required, because its absolute path + // ("/user/worker-manager") will be used as a service path by ClusterClientReceptionist. + // + // This name has to be unique for all actor names living in this actor system. + var workerManagerActor = system.ActorOf(resolver.Props(typeof(WorkerManagerActor)), "worker-manager"); + registry.Register<WorkerManagerActor>(workerManagerActor); + + // The name of this actor ("metric-workload-counter") is optional as it leverages the + // distributed pub-sub system. External actor systems does not need to know the actor path to + // query the workload metric. + var workLoadCounterActor = system.ActorOf(Props.Create(() => new MetricCounterActor()), "metric-workload-counter"); + registry.Register<MetricCounterActor>(workLoadCounterActor); + }) + .AddStartup((system, registry) => + { + var receptionist = ClusterClientReceptionist.Get(system); + + // Register the worker manager actor as a service, + // this can be accessed through "user/worker-manager" + var workerManagerActor = registry.Get<WorkerManagerActor>(); + receptionist.RegisterService(workerManagerActor); + + // Register the workload counter metric actor as a topic listener, it will subscribe to the + // "report" topic + var workloadCounterActor = registry.Get<MetricCounterActor>(); + receptionist.RegisterSubscriber("report", workloadCounterActor); + }); + }); + }).Build(); + +await host.StartAsync(); +await host.WaitForShutdownAsync(); \ No newline at end of file diff --git a/src/clustering/cluster-client/ClusterClientSample.Node/ClusterClientSample.Node.csproj b/src/clustering/cluster-client/ClusterClientSample.Node/ClusterClientSample.Node.csproj new file mode 100644 index 0000000..e1e74b3 --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.Node/ClusterClientSample.Node.csproj @@ -0,0 +1,19 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <OutputType>Exe</OutputType> + <TargetFramework>net8.0</TargetFramework> + <ImplicitUsings>enable</ImplicitUsings> + <Nullable>enable</Nullable> + </PropertyGroup> + + <ItemGroup> + <ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" /> + </ItemGroup> + + <ItemGroup> + <PackageReference Include="Akka.Cluster.Hosting" /> + <PackageReference Include="Akka.Serialization.Hyperion" /> + </ItemGroup> + +</Project> diff --git a/src/clustering/cluster-client/ClusterClientSample.Node/Program.cs b/src/clustering/cluster-client/ClusterClientSample.Node/Program.cs new file mode 100644 index 0000000..1ce8090 --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.Node/Program.cs @@ -0,0 +1,37 @@ +using Akka.Cluster.Hosting; +using Akka.Hosting; +using Akka.Remote.Hosting; +using Microsoft.Extensions.Hosting; + +const string systemName = "cluster-system"; +const string hostName = "localhost"; +const int port = 0; +const string gateway = $"akka.tcp://{systemName}@{hostName}:12552"; + +Console.Title = "Backend Worker Node"; + +// Note that we did not start any actors in the nodes, all actors will be deployed using remoting +var host = new HostBuilder() + .ConfigureServices((context, services) => + { + services.AddAkka(systemName, builder => + { + builder + // Setup remoting and clustering + .WithRemoting(configure: options => + { + options.Port = port; + options.HostName = hostName; + }) + .WithClustering(options: new ClusterOptions + { + // Giving this cluster node the role/tag "worker" signals that the gateway node can + // deploy worker actors in this node using remoting + Roles = ["worker"], + SeedNodes = [gateway] + }); + }); + }).Build(); + +await host.StartAsync(); +await host.WaitForShutdownAsync(); \ No newline at end of file diff --git a/src/clustering/cluster-client/ClusterClientSample.Shared/ClusterClientSample.Shared.csproj b/src/clustering/cluster-client/ClusterClientSample.Shared/ClusterClientSample.Shared.csproj new file mode 100644 index 0000000..b3db7e4 --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.Shared/ClusterClientSample.Shared.csproj @@ -0,0 +1,8 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <ItemGroup> + <PackageReference Include="Akka.Cluster.Tools" /> + <PackageReference Include="Akka.Hosting" /> + </ItemGroup> + +</Project> diff --git a/src/clustering/cluster-client/ClusterClientSample.Shared/Messages.cs b/src/clustering/cluster-client/ClusterClientSample.Shared/Messages.cs new file mode 100644 index 0000000..bf4b7e8 --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.Shared/Messages.cs @@ -0,0 +1,24 @@ +using System.Collections.Generic; + +namespace ClusterClientSample.Shared; + +public sealed record BatchedWork(int Size); + +public sealed record Work(int Id); + +public sealed class WorkComplete +{ + public static readonly WorkComplete Instance = new(); + private WorkComplete() { } +} + +public sealed record Result(int Id, int Value); + +public sealed class SendReport +{ + public static readonly SendReport Instance = new (); + + private SendReport() { } +} + +public sealed record Report(IDictionary<string, int> Counts); diff --git a/src/clustering/cluster-client/ClusterClientSample.Shared/WorkerActor.cs b/src/clustering/cluster-client/ClusterClientSample.Shared/WorkerActor.cs new file mode 100644 index 0000000..088ffab --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.Shared/WorkerActor.cs @@ -0,0 +1,39 @@ +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Event; + +namespace ClusterClientSample.Shared; + +public class WorkerActor : ReceiveActor +{ + private readonly ILoggingAdapter _log; + + public WorkerActor(IActorRef counter) + { + _log = Context.GetLogger(); + + ReceiveAsync<Work>(async work => + { + var workResult = await BusinessLogic(work.Id); + var result = new Result(work.Id, workResult); + _log.Info("Worker {0} - [{1}]: {2}", Self.Path.Name, result.Id, result.Value); + Sender.Tell(result); + counter.Tell(WorkComplete.Instance); + }); + } + + protected override void PreStart() + { + base.PreStart(); + _log.Info("Worker actor started at {0}", Self.Path); + } + + // Simulate a computationally expensive workload + private static readonly Random Rnd = new (); + private async Task<int> BusinessLogic(int input) + { + await Task.Delay(Rnd.Next(100, 1000)); + return input * 10; + } +} \ No newline at end of file diff --git a/src/clustering/cluster-client/ClusterClientSample.sln b/src/clustering/cluster-client/ClusterClientSample.sln new file mode 100644 index 0000000..8ec58be --- /dev/null +++ b/src/clustering/cluster-client/ClusterClientSample.sln @@ -0,0 +1,41 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ClusterClientSample.Shared", "ClusterClientSample.Shared\ClusterClientSample.Shared.csproj", "{C5CC9D20-F16C-44BE-9CDE-1D2227D39508}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "_", "_", "{25EC11E5-0FAE-409B-B178-4E3E89F14330}" + ProjectSection(SolutionItems) = preProject + Directory.Build.props = Directory.Build.props + Directory.Packages.props = Directory.Packages.props + README.md = README.md + EndProjectSection +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ClusterClientSample.Gateway", "ClusterClientSample.Gateway\ClusterClientSample.Gateway.csproj", "{B2DCFC44-082B-433D-AEB0-B7EC174B593D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ClusterClientSample.Node", "ClusterClientSample.Node\ClusterClientSample.Node.csproj", "{BE579313-8D25-42E7-AAF2-386DA42FE287}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ClusterClientSample.FrontEnd", "ClusterClientSample.FrontEnd\ClusterClientSample.FrontEnd.csproj", "{5425F5FC-02E1-4A54-8FC6-931EB3B143DE}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {C5CC9D20-F16C-44BE-9CDE-1D2227D39508}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C5CC9D20-F16C-44BE-9CDE-1D2227D39508}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C5CC9D20-F16C-44BE-9CDE-1D2227D39508}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C5CC9D20-F16C-44BE-9CDE-1D2227D39508}.Release|Any CPU.Build.0 = Release|Any CPU + {B2DCFC44-082B-433D-AEB0-B7EC174B593D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B2DCFC44-082B-433D-AEB0-B7EC174B593D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B2DCFC44-082B-433D-AEB0-B7EC174B593D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B2DCFC44-082B-433D-AEB0-B7EC174B593D}.Release|Any CPU.Build.0 = Release|Any CPU + {BE579313-8D25-42E7-AAF2-386DA42FE287}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BE579313-8D25-42E7-AAF2-386DA42FE287}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BE579313-8D25-42E7-AAF2-386DA42FE287}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BE579313-8D25-42E7-AAF2-386DA42FE287}.Release|Any CPU.Build.0 = Release|Any CPU + {5425F5FC-02E1-4A54-8FC6-931EB3B143DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5425F5FC-02E1-4A54-8FC6-931EB3B143DE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5425F5FC-02E1-4A54-8FC6-931EB3B143DE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5425F5FC-02E1-4A54-8FC6-931EB3B143DE}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/src/clustering/cluster-client/Directory.Build.props b/src/clustering/cluster-client/Directory.Build.props new file mode 100644 index 0000000..ba1c7cd --- /dev/null +++ b/src/clustering/cluster-client/Directory.Build.props @@ -0,0 +1,37 @@ +<Project> + <PropertyGroup> + <TargetFramework>net8.0</TargetFramework> + <LangVersion>latestmajor</LangVersion> + <Nullable>enable</Nullable> + </PropertyGroup> + + <PropertyGroup> + <LibraryFramework>netstandard2.0</LibraryFramework> + <TestsNetCoreFramework>net6.0</TestsNetCoreFramework> + <XunitVersion>2.7.1</XunitVersion> + <TestSdkVersion>17.9.0</TestSdkVersion> + <CoverletVersion>6.0.2</CoverletVersion> + <XunitRunneVisualstudio>2.5.8</XunitRunneVisualstudio> + <AkkaVersion>1.5.19</AkkaVersion> + <MicrosoftExtensionsVersion>[6.0.0,)</MicrosoftExtensionsVersion> + </PropertyGroup> + + <!-- SourceLink support for all Akka.NET projects --> + <ItemGroup> + <PackageReference Include="Microsoft.SourceLink.GitHub" PrivateAssets="All" /> + </ItemGroup> + + <ItemGroup> + <None Include="$(MSBuildThisFileDirectory)\..\docs\images\akkalogo.png" Pack="true" Visible="false" PackagePath="\" /> + <None Include="$(MSBuildThisFileDirectory)\..\README.md" Pack="true" Visible="false" PackagePath="\" /> + </ItemGroup> + + <PropertyGroup> + <PublishRepositoryUrl>true</PublishRepositoryUrl> + <!-- Optional: Embed source files that are not tracked by the source control manager in the PDB --> + <EmbedUntrackedSources>true</EmbedUntrackedSources> + <!-- Optional: Build symbol package (.snupkg) to distribute the PDB containing Source Link --> + <IncludeSymbols>true</IncludeSymbols> + <SymbolPackageFormat>snupkg</SymbolPackageFormat> + </PropertyGroup> +</Project> diff --git a/src/clustering/cluster-client/Directory.Packages.props b/src/clustering/cluster-client/Directory.Packages.props new file mode 100644 index 0000000..65b1f86 --- /dev/null +++ b/src/clustering/cluster-client/Directory.Packages.props @@ -0,0 +1,18 @@ +<Project> + <PropertyGroup> + <ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally> + <AkkaVersion>1.5.20</AkkaVersion> + <AkkaHostingVersion>1.5.20</AkkaHostingVersion> + </PropertyGroup> + <!-- Akka dependencies --> + <ItemGroup> + <PackageVersion Include="Akka.Cluster.Tools" Version="$(AkkaVersion)" /> + <PackageVersion Include="Akka.Hosting" Version="$(AkkaHostingVersion)" /> + <PackageVersion Include="Akka.Cluster.Hosting" Version="$(AkkaHostingVersion)" /> + <PackageVersion Include="Akka.Serialization.Hyperion" Version="$(AkkaVersion)" /> + </ItemGroup> + <!-- SourceLink support for all Akka.NET projects --> + <ItemGroup> + <PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" /> + </ItemGroup> +</Project> \ No newline at end of file diff --git a/src/clustering/cluster-client/README.md b/src/clustering/cluster-client/README.md new file mode 100644 index 0000000..aff74df --- /dev/null +++ b/src/clustering/cluster-client/README.md @@ -0,0 +1,63 @@ +# Akka.NET Actor System intercommunication + +The goal of this sample is to demonstrate how an external actor system can communicate with a cluster + +## Technology + +This solution is built with: + +- Minimal APIs; +- C# `record` types; +- Akka.NET v1.5 w/ Akka.Cluster; +- Akka.Cluster.Tools; and +- [Akka.Hosting](https://github.com/akkadotnet/Akka.Hosting) - which minimizes the amount of configuration for Akka.NET to practically zero. + + +## Domain + +Like all the samples in this repository, we are using a simple domain since the focus of the sample is meant to be _how to use Akka.NET infrastructure succinctly and successfully_. That being said, it's still worth understanding what our domain does: work distribution and metric collection. + +### Backend + +The backend app consists of two Akka.Cluster role types: + +1. `ClusterClientSample.Gateway` with role "gateway" - a [headless Akka.NET process](https://petabridge.com/blog/akkadotnet-ihostedservice/) that acts as the known cluster client receptionist node (fixed address) that accepts work and metric report requests and +2. `ClusterClientSample.Node` with role "worker" - a [headless Akka.NET process](https://petabridge.com/blog/akkadotnet-ihostedservice/) where all worker actors were to be distributed. + +Actor types in the backend cluster nodes: +* `WorkerManagerActor` - This actor is responsible for + * transforming the `BatchedWork` message into `Work` messages and forwarding them to the distributed `WorkerActor`s in the cluster, and + * creating the cluster router that will deploy and route all `Work` messages to the `WorkerActor`s inside the cluster. + + It is registered to `ClusterClientReceptionist` as a service with its actor path **"/user/worker-manager"** as the service key for other actor systems to access. +* `MetricCounterActor` - This actor is responsible for keeping track of works done by each of the `WorkerActor`s. It is registered to `ClusterClientReceptionist` as a subscriber to the **"report"** topic. +* `WorkerActor` - This actor is responsible for the asynchronous business logic execution of the `Work` messages. The `WorkerActor` type is shared using a shared `ClusterClientSample.Shared` library so that it can be used by both the `ClusterClientSample.Gateway` and `ClusterClientSample.Node` projects. + +## Frontend + +The frontend app `ClusterClientSample.FrontEnd` is a simple [headless Akka.NET process](https://petabridge.com/blog/akkadotnet-ihostedservice/) with Akka.Remote that periodically sends messages to the backend app via the `ClusterClient` tunneling. + +We have only two actor types in the frontend app: +* `BatchedWorkRequester` - This actor periodically + * sends a `BatchedWork` request message to the backend app **"/user/worker-manager" service** via `ClusterClient`, and + * receives the `Result` messages sent back after a work has been completed and logs it. +* `WorkReportCollector` - This actor + * publishes a `SendReport` request message to the backend app **"report" pub-sub topic** via `ClusterClient`, and + * receives the `Report` message sent by the metric actor and logs it. + +## Running Sample + +Load up Rider or Visual Studio and + +1. Launch `ClusterClientSample.Gateway`, followed by +2. Launch `ClusterClientSample.Node`, and then +3. Launch `ClusterClientSample.FrontEnd` + +You should see +1. Distributed work scheduling: + * The frontend app sending batched work requests to the backend gateway node + * The worker actors in the worker node completing each work requests asynchronously + * The result for all completed work to be sent back to the frontend app and printed in the console. +2. Workload metric reporting: + * The frontend app publishing metric requests to the pub-sub topic in the backend gateway node + * The metric report sent back to the frontend app and printed in the console. \ No newline at end of file