Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClusterClient sample project #185

Merged
merged 4 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace ClusterClientSample.FrontEnd.Actors;

// This class will only be used as a Type marker to retrieve the `ClusterClient` actor from the `ActorRegistry`.
// It is not meant to be instantiated in any way.
public sealed class GatewayClusterClientActor
{
private GatewayClusterClientActor(){ }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Event;
using Akka.Hosting;
using ClusterClientSample.Shared;

namespace ClusterClientSample.FrontEnd.Actors;

/// <summary>
/// This actor will:
/// * periodically send a `BatchedWork` request message to the backend app "/user/worker-manager"
/// service via the `ClusterClient` actor
/// * receive the `Result` message for each work performed and logs them
/// </summary>
public class BatchedWorkRequester: ReceiveActor, IWithTimers
{
private const string BatchKey = nameof(BatchKey);

private readonly Random _random = new();

public BatchedWorkRequester(IRequiredActor<GatewayClusterClientActor> clusterClientActor)
{
var log = Context.GetLogger();
var clusterClient = clusterClientActor.ActorRef;

Receive<BatchedWork>(msg =>
{
log.Info("Requesting a batched work to the other cluster. Count: {0}", msg.Size);
clusterClient.Tell(new ClusterClient.Send("/user/worker-manager", msg, true));
Timers.StartSingleTimer(BatchKey, GetBatch(), TimeSpan.FromSeconds(10));
});

Receive<Result>(msg =>
{
log.Info("[ID:{0}] Work result: {1}", msg.Id, msg.Value);
});
}

protected override void PreStart()
{
base.PreStart();
Timers.StartSingleTimer(BatchKey, GetBatch(), TimeSpan.FromSeconds(3));
}

public ITimerScheduler Timers { get; set; } = null!;

private BatchedWork GetBatch() => new (_random.Next(5) + 5);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Event;
using Akka.Hosting;
using ClusterClientSample.Shared;

namespace ClusterClientSample.FrontEnd.Actors;

/// <summary>
/// This actor will:
/// * periodically publish a `SendReport` message to the backend app "report" pub-sub topic via the `ClusterClient` actor
/// * receive the `Report` message from the workload metric actor and logs them
/// </summary>
public class WorkReportCollector: ReceiveActor, IWithTimers
{
private class GetReport
{
public static readonly GetReport Instance = new();
private GetReport() { }
}

private const string ReportKey = nameof(ReportKey);

public WorkReportCollector(IRequiredActor<GatewayClusterClientActor> clusterClientActor)
{
var log = Context.GetLogger();
var clusterClient = clusterClientActor.ActorRef;

Receive<Report>(report =>
{
foreach (var (actor, count) in report.Counts)
{
log.Info("Worker {0} has done {1} works", actor, count);
}
});

Receive<GetReport>(_ =>
{
log.Info("Requesting work report metrics from the other cluster.");
clusterClient.Tell(new ClusterClient.Publish("report", SendReport.Instance));
});
}

public ITimerScheduler Timers { get; set; } = null!;

protected override void PreStart()
{
base.PreStart();
Timers.StartPeriodicTimer(ReportKey, GetReport.Instance, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Cluster.Hosting" />
<PackageReference Include="Akka.Serialization.Hyperion" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using Akka.Cluster.Hosting;
using Akka.Hosting;
using Akka.Remote.Hosting;
using ClusterClientSample.FrontEnd.Actors;
using Microsoft.Extensions.Hosting;

const string gatewaySystemAddress = "akka.tcp://cluster-system@localhost:12552";
const string systemName = "remote-cluster-system";
const string hostName = "localhost";
const int port = 12553;

Console.Title = "Frontend Gateway Node";

var host = new HostBuilder()
.ConfigureServices((context, services) =>
{
services.AddAkka(systemName, builder =>
{
builder
// Setup remoting
.WithRemoting(configure: options =>
{
options.Port = port;
options.HostName = hostName;
})

// Setup `ClusterClient` actor to connect to another actor system
.WithClusterClient<GatewayClusterClientActor>([ $"{gatewaySystemAddress}/system/receptionist" ])

// Setup required actors and startup code
.WithActors((system, registry, resolver) =>
{
var requesterActor = system.ActorOf(resolver.Props(typeof(BatchedWorkRequester)), "work-batch-requester");
registry.Register<BatchedWorkRequester>(requesterActor);

var reportCollectorActor = system.ActorOf(resolver.Props(typeof(WorkReportCollector)), "work-report-collector");
registry.Register<WorkReportCollector>(reportCollectorActor);
});
});
}).Build();

await host.StartAsync();
await host.WaitForShutdownAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Akka.Actor;
using ClusterClientSample.Shared;

namespace ClusterClientSample.Gateway.Actors;

public class MetricCounterActor : ReceiveActor
{
public MetricCounterActor()
{
var counts = new Dictionary<IActorRef, int>();

Receive<WorkComplete>(_ =>
{
if (counts.TryGetValue(Sender, out var count))
counts[Sender] = ++count;
else
counts.Add(Sender, 1);
});

Receive<SendReport>(_ => Sender.Tell(new Report(
counts.ToDictionary(kvp => kvp.Key.Path.ToString(), kvp => kvp.Value))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Akka.Actor;
using Akka.Cluster.Routing;
using Akka.Event;
using Akka.Hosting;
using Akka.Routing;
using ClusterClientSample.Shared;

namespace ClusterClientSample.Gateway.Actors;

public class WorkerManagerActor : ReceiveActor
{
private static int _nextId = 1;

public WorkerManagerActor(IRequiredActor<MetricCounterActor> counterActor)
{
var log = Context.GetLogger();
var counter = counterActor.ActorRef;
var workerRouter = GetWorkerRouter(counter);

Receive<BatchedWork>(batch =>
{
log.Info("Generating a work batch of size {0}", batch.Size);
for (var i = 0; i < batch.Size; i++)
{
// forward the work request as if it was sent by the original sender so that the work result can be
// sent back to the original sender by the worker
workerRouter.Forward(new Work(_nextId++));
}
});
}

private static IActorRef GetWorkerRouter(IActorRef counter)
{
// Creates a cluster router pool of 10 workers for each cluster node with the role "worker"
// that joins the cluster.
//
// The router will use a round-robin strategy to distribute messages amongst the worker actors
var props = new ClusterRouterPool(
local: new RoundRobinPool(10),
settings: new ClusterRouterPoolSettings(30, 10, true, "worker"))
.Props(Props.Create(() => new WorkerActor(counter)));

return Context.ActorOf(props);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Cluster.Hosting" />
<PackageReference Include="Akka.Serialization.Hyperion" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using Akka.Actor;
using Akka.Cluster.Hosting;
using Akka.Cluster.Tools.Client;
using Akka.Hosting;
using Akka.Remote.Hosting;
using ClusterClientSample.Gateway.Actors;
using Microsoft.Extensions.Hosting;

const string systemName = "cluster-system";
const string hostName = "localhost";
const string port = "12552";
const string selfAddress = $"akka.tcp://{systemName}@{hostName}:{port}";

// This node acts as a known contact point for all external actor systems to connect via `ClusterClient` by setting up
// the `ClusterClientReceptionist`
Console.Title = "Backend Gateway Node";

var host = new HostBuilder()
.ConfigureServices((context, services) =>
{
services.AddAkka(systemName, builder =>
{
builder
// Setup remoting and clustering
.WithRemoting(configure: options =>
{
options.Port = int.Parse(port);
options.HostName = hostName;
})
.WithClustering(options: new ClusterOptions
{
// Note that we do not assign the role/tag "worker" to this node, no worker actors will be
// deployed in this node.
//
// If we want the gateway to also work double duty as worker node, we can add the "worker" role/tag
// to the Roles array.
Roles = [ "gateway" ],
SeedNodes = [ selfAddress ]
})

// Setup `ClusterClientReceptionist` to only deploy on nodes with "gateway" role
.WithClusterClientReceptionist(role: "gateway")

// Setup required actors and startup code
.WithActors((system, registry, resolver) =>
{
// The name of this actor ("worker-manager") is required, because its absolute path
// ("/user/worker-manager") will be used as a service path by ClusterClientReceptionist.
//
// This name has to be unique for all actor names living in this actor system.
var workerManagerActor = system.ActorOf(resolver.Props(typeof(WorkerManagerActor)), "worker-manager");
registry.Register<WorkerManagerActor>(workerManagerActor);

// The name of this actor ("metric-workload-counter") is optional as it leverages the
// distributed pub-sub system. External actor systems does not need to know the actor path to
// query the workload metric.
var workLoadCounterActor = system.ActorOf(Props.Create(() => new MetricCounterActor()), "metric-workload-counter");
registry.Register<MetricCounterActor>(workLoadCounterActor);
})
.AddStartup((system, registry) =>
{
var receptionist = ClusterClientReceptionist.Get(system);

// Register the worker manager actor as a service,
// this can be accessed through "user/worker-manager"
var workerManagerActor = registry.Get<WorkerManagerActor>();
receptionist.RegisterService(workerManagerActor);

// Register the workload counter metric actor as a topic listener, it will subscribe to the
// "report" topic
var workloadCounterActor = registry.Get<MetricCounterActor>();
receptionist.RegisterSubscriber("report", workloadCounterActor);
});
});
}).Build();

await host.StartAsync();
await host.WaitForShutdownAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Akka.Cluster.Hosting" />
<PackageReference Include="Akka.Serialization.Hyperion" />
</ItemGroup>

</Project>
Loading
Loading