From fe13fdc5ea3b3449362925e00ea14b2e98fb07a1 Mon Sep 17 00:00:00 2001 From: Emil Koutanov Date: Mon, 29 Jan 2024 11:26:01 +1100 Subject: [PATCH] Backpressure example --- .vscode/settings.json | 1 + Examples/Backpressure/Backpressure.cs | 71 +++++++++++++++++++++++++++ Examples/Program.cs | 1 + 3 files changed, 73 insertions(+) create mode 100644 Examples/Backpressure/Backpressure.cs diff --git a/.vscode/settings.json b/.vscode/settings.json index ce509bb..fb64ded 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,6 @@ { "cSpell.words": [ + "Backpresure", "Schedulable" ] } \ No newline at end of file diff --git a/Examples/Backpressure/Backpressure.cs b/Examples/Backpressure/Backpressure.cs new file mode 100644 index 0000000..d8e215f --- /dev/null +++ b/Examples/Backpressure/Backpressure.cs @@ -0,0 +1,71 @@ +using Actors; + +namespace Examples.Backpresure; + +/// +/// A scenario where some unbounded source is submitting work to a sink, which takes time to process items. We want to prevent +/// the source from submitting work beyond the sink's processing ability. +/// +/// The example demonstrates the use of a TaskCompletionSource to communicate completion out of an actor. +/// +public class Example +{ + /// + /// Instruction to perform some background work. + /// + /// Unique identifier of the work item. + class WorkItem(int id) + { + internal TaskCompletionSource Completion { get; } = new(); + + internal int Id { get; } = id; + } + + class SinkActor : Actor + { + protected override async Task Perform(Inbox inbox) + { + var workItem = inbox.Receive(); + Console.WriteLine("processing work item {0}", workItem.Id); + await Task.Delay(1); + workItem.Completion.SetResult(); + } + } + + public static async Task RunAsync() + { + Console.WriteLine("---\nRunning backpressure example"); + + const int Messages = 100; + const int MaxPending = 10; + + var sinkActor = new SinkActor(); + var pendingWork = new Dictionary(); + for (int i = 0; i < Messages; i++) + { + var workItem = new WorkItem(i); + await FreeCapacityAsync(pendingWork, MaxPending); + pendingWork[workItem.Id] = workItem.Completion.Task; + Console.WriteLine("submitting work item {0}", workItem.Id); + sinkActor.Send(workItem); + } + await sinkActor.Drain(); + } + + public static async Task FreeCapacityAsync(Dictionary pendingWork, int maxPending) + { + if (pendingWork.Count == maxPending) + { + Console.WriteLine("waiting for backlog to subside"); + await Task.WhenAny(pendingWork.Values); + foreach (var entry in pendingWork) + { + if (entry.Value.IsCompleted) + { + pendingWork.Remove(entry.Key); + } + } + Console.WriteLine("reduced to {0}", pendingWork.Count); + } + } +} \ No newline at end of file diff --git a/Examples/Program.cs b/Examples/Program.cs index 9266f5b..0628cc6 100644 --- a/Examples/Program.cs +++ b/Examples/Program.cs @@ -9,6 +9,7 @@ static async Task Main(string[] args) { "Throttle", () => Examples.Throttle.Example.RunAsync() }, { "Counter", () => Examples.Counter.Example.RunAsync() }, { "Batch", () => Examples.Batch.Example.RunAsync() }, + { "Backpressure", () => Examples.Backpresure.Example.RunAsync() }, }; if (args.Length == 1) // run a specific example