Skip to content

Commit

Permalink
Backpressure example
Browse files Browse the repository at this point in the history
  • Loading branch information
Emil Koutanov committed Jan 29, 2024
1 parent c0acb7f commit fe13fdc
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 0 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"cSpell.words": [
"Backpresure",
"Schedulable"
]
}
71 changes: 71 additions & 0 deletions Examples/Backpressure/Backpressure.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using Actors;

namespace Examples.Backpresure;

/// <summary>
/// A scenario where some unbounded source is submitting work to a sink, which takes time to process items. We want to prevent
/// the source from submitting work beyond the sink's processing ability.
///
/// The example demonstrates the use of a <c>TaskCompletionSource</c> to communicate completion out of an actor.
/// </summary>
public class Example
{
/// <summary>
/// Instruction to perform some background work.
/// </summary>
/// <param name="id">Unique identifier of the work item.</param>
class WorkItem(int id)
{
internal TaskCompletionSource Completion { get; } = new();

internal int Id { get; } = id;
}

class SinkActor : Actor<WorkItem>
{
protected override async Task Perform(Inbox inbox)
{
var workItem = inbox.Receive();
Console.WriteLine("processing work item {0}", workItem.Id);
await Task.Delay(1);
workItem.Completion.SetResult();
}
}

public static async Task RunAsync()
{
Console.WriteLine("---\nRunning backpressure example");

const int Messages = 100;
const int MaxPending = 10;

var sinkActor = new SinkActor();
var pendingWork = new Dictionary<int, Task>();
for (int i = 0; i < Messages; i++)
{
var workItem = new WorkItem(i);
await FreeCapacityAsync(pendingWork, MaxPending);
pendingWork[workItem.Id] = workItem.Completion.Task;
Console.WriteLine("submitting work item {0}", workItem.Id);
sinkActor.Send(workItem);
}
await sinkActor.Drain();
}

public static async Task FreeCapacityAsync(Dictionary<int, Task> pendingWork, int maxPending)
{
if (pendingWork.Count == maxPending)
{
Console.WriteLine("waiting for backlog to subside");
await Task.WhenAny(pendingWork.Values);
foreach (var entry in pendingWork)
{
if (entry.Value.IsCompleted)
{
pendingWork.Remove(entry.Key);
}
}
Console.WriteLine("reduced to {0}", pendingWork.Count);
}
}
}
1 change: 1 addition & 0 deletions Examples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ static async Task Main(string[] args)
{ "Throttle", () => Examples.Throttle.Example.RunAsync() },
{ "Counter", () => Examples.Counter.Example.RunAsync() },
{ "Batch", () => Examples.Batch.Example.RunAsync() },
{ "Backpressure", () => Examples.Backpresure.Example.RunAsync() },
};

if (args.Length == 1) // run a specific example
Expand Down

0 comments on commit fe13fdc

Please sign in to comment.