Skip to content

Commit

Permalink
Add stream ref middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
Jubast committed Jun 3, 2021
1 parent 5f1096e commit f765897
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 28 deletions.
5 changes: 4 additions & 1 deletion Source/Orleankka/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ public abstract class ActorSystem : IActorSystem
readonly IServiceProvider serviceProvider;
readonly IGrainFactory grainFactory;
readonly IActorRefMiddleware actorRefMiddleware;
readonly IStreamRefMiddleware streamRefMiddleware;

protected ActorSystem(Assembly[] assemblies, IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
this.grainFactory = serviceProvider.GetService<IGrainFactory>();
this.actorRefMiddleware = serviceProvider.GetService<IActorRefMiddleware>() ?? DefaultActorRefMiddleware.Instance;
this.streamRefMiddleware = serviceProvider.GetService<IStreamRefMiddleware>() ?? DefaultStreamRefMiddleware.Instance;

Register(assemblies);
}
Expand Down Expand Up @@ -97,7 +99,8 @@ public StreamRef<TItem> StreamOf<TItem>(StreamPath path)
throw new ArgumentException("Stream path is empty", nameof(path));

var provider = serviceProvider.GetServiceByName<IStreamProvider>(path.Provider);
return new StreamRef<TItem>(path, provider);

return new StreamRef<TItem>(path, provider, streamRefMiddleware);
}

/// <inheritdoc />
Expand Down
2 changes: 2 additions & 0 deletions Source/Orleankka/Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@
namespace Orleankka
{
public delegate Task<object> Receive(object message);

public delegate Task Receive<in TMessage>(TMessage message);
}
85 changes: 63 additions & 22 deletions Source/Orleankka/StreamRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
using Orleans.Streams;
using Orleans.Runtime;

using Microsoft.Extensions.DependencyInjection;

namespace Orleankka
{
using Utility;
Expand All @@ -18,13 +20,20 @@ namespace Orleankka
[DebuggerDisplay("s->{ToString()}")]
public class StreamRef<TItem> : IEquatable<StreamRef<TItem>>, IEquatable<StreamPath>
{
[NonSerialized]
readonly IStreamProvider provider;
[NonSerialized] readonly IStreamProvider provider;
[NonSerialized] readonly IStreamRefMiddleware middleware;

protected internal StreamRef(StreamPath path, IStreamProvider provider = null)
protected StreamRef(StreamPath path)
{
Path = path;
}

internal StreamRef(StreamPath path, IStreamProvider provider, IStreamRefMiddleware middleware)
: this(path)
{

this.provider = provider;
this.middleware = middleware;
}

[NonSerialized]
Expand Down Expand Up @@ -88,16 +97,28 @@ public virtual async Task Publish<TMessage>(TMessage message) where TMessage : P
switch (message)
{
case NextItem<TItem> next:
await Endpoint.OnNextAsync(next.Item, next.Token);
await middleware.Publish(Path, next, async x =>
{
await Endpoint.OnNextAsync(x.Item, x.Token);
});
break;
case NextItemBatch<TItem> next:
await Endpoint.OnNextBatchAsync(next.Items, next.Token);
await middleware.Publish(Path, next, async x =>
{
await Endpoint.OnNextBatchAsync(x.Items, x.Token);
});
break;
case NotifyStreamError error:
await Endpoint.OnErrorAsync(error.Exception);
await middleware.Publish(Path, error, async x =>
{
await Endpoint.OnErrorAsync(x.Exception);
});
break;
case NotifyStreamCompleted _:
await Endpoint.OnCompletedAsync();
case NotifyStreamCompleted completed:
await middleware.Publish(Path, completed, async _ =>
{
await Endpoint.OnCompletedAsync();
});
break;
default:
throw new ArgumentOutOfRangeException(nameof(message), $"Unsupported type of publish message: '{message.GetType()}'");
Expand Down Expand Up @@ -130,7 +151,7 @@ public virtual async Task<StreamSubscription<TItem>> Subscribe<TOptions>(Func<St

async Task<StreamSubscription<TItem>> Subscribe(SubscribeReceiveItem o)
{
var observer = new Observer(this, callback);
var observer = CreateObserver(callback);

var predicate = o.Filter != null
? StreamFilter.Internal.Predicate
Expand All @@ -142,7 +163,7 @@ async Task<StreamSubscription<TItem>> Subscribe(SubscribeReceiveItem o)

async Task<StreamSubscription<TItem>> SubscribeBatch(SubscribeReceiveBatch o)
{
var observer = new BatchObserver(this, callback);
var observer = CreateBatchObserver(callback);
var handle = await Endpoint.SubscribeAsync(observer, o.Token);
return new StreamSubscription<TItem>(this, handle);
}
Expand Down Expand Up @@ -198,46 +219,66 @@ static object Deserialize(Type t, IDeserializationContext context)
{
var reader = context.StreamReader;
var path = StreamPath.Parse(reader.ReadString());
var provider = context.ServiceProvider.GetServiceByName<IStreamProvider>(path.Provider);
return new StreamRef<TItem>(path, provider);
var system = context.ServiceProvider.GetRequiredService<IActorSystem>();
return system.StreamOf<TItem>(path);
}

#endregion

internal BatchObserver CreateBatchObserver(Func<StreamMessage, Task> callback)
{
return new BatchObserver(this, callback, middleware);
}

internal Observer CreateObserver(Func<StreamMessage, Task> callback)
{
return new Observer(this, callback, middleware);
}

internal class BatchObserver : IAsyncBatchObserver<TItem>
{
readonly StreamRef<TItem> stream;
readonly Func<StreamMessage, Task> callback;
readonly IStreamRefMiddleware middleware;

public BatchObserver(StreamRef<TItem> stream, Func<StreamMessage, Task> callback)
public BatchObserver(StreamRef<TItem> stream, Func<StreamMessage, Task> callback, IStreamRefMiddleware middleware)
{
this.stream = stream;
this.callback = callback;
this.middleware = middleware;
}

public Task OnNextAsync(IList<SequentialItem<TItem>> items) =>
callback(new StreamItemBatch<TItem>(stream, items));
public Task OnNextAsync(IList<SequentialItem<TItem>> items) =>
middleware.Receive(stream.Path, new StreamItemBatch<TItem>(stream, items), x => callback(x));

public Task OnCompletedAsync() =>
middleware.Receive(stream.Path, new StreamCompleted(stream), x => callback(x));

public Task OnCompletedAsync() => callback(new StreamCompleted(stream));
public Task OnErrorAsync(Exception ex) => callback(new StreamError(stream, ex));
public Task OnErrorAsync(Exception ex) =>
middleware.Receive(stream.Path, new StreamError(stream, ex), x => callback(x));
}

internal class Observer : IAsyncObserver<TItem>
{
readonly StreamRef<TItem> stream;
readonly Func<StreamMessage, Task> callback;
readonly IStreamRefMiddleware middleware;

public Observer(StreamRef<TItem> stream, Func<StreamMessage, Task> callback)
public Observer(StreamRef<TItem> stream, Func<StreamMessage, Task> callback, IStreamRefMiddleware middleware)
{
this.stream = stream;
this.callback = callback;
this.middleware = middleware;
}

public Task OnNextAsync(TItem item, StreamSequenceToken token = null) =>
callback(new StreamItem<TItem>(stream, item, token));
public Task OnNextAsync(TItem item, StreamSequenceToken token = null) =>
middleware.Receive(stream.Path, new StreamItem<TItem>(stream, item, token), x => callback(x));

public Task OnCompletedAsync() =>
middleware.Receive(stream.Path, new StreamCompleted(stream), x => callback(x));

public Task OnCompletedAsync() => callback(new StreamCompleted(stream));
public Task OnErrorAsync(Exception ex) => callback(new StreamError(stream, ex));
public Task OnErrorAsync(Exception ex) =>
middleware.Receive(stream.Path, new StreamError(stream, ex), x => callback(x));
}
}
}
35 changes: 35 additions & 0 deletions Source/Orleankka/StreamRefMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Threading.Tasks;

namespace Orleankka
{
public interface IStreamRefMiddleware
{
Task Publish<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : PublishMessage;
Task Receive<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : StreamMessage;
}

public abstract class StreamRefMiddleware : IStreamRefMiddleware
{
readonly IStreamRefMiddleware next;

protected StreamRefMiddleware(IStreamRefMiddleware next = null) =>
this.next = next ?? DefaultStreamRefMiddleware.Instance;

public virtual Task Publish<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : PublishMessage =>
next.Publish(path, message, receiver);

public virtual Task Receive<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : StreamMessage =>
next.Receive(path, message, receiver);
}

class DefaultStreamRefMiddleware : IStreamRefMiddleware
{
public static readonly DefaultStreamRefMiddleware Instance = new DefaultStreamRefMiddleware();

public Task Publish<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : PublishMessage =>
receiver(message);

public Task Receive<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : StreamMessage =>
receiver(message);
}
}
4 changes: 2 additions & 2 deletions Source/Orleankka/StreamSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ public virtual async Task<StreamSubscription<TItem>> Resume<TOptions>(Func<Strea

async Task<StreamSubscription<TItem>> Resume(ResumeReceiveItem o)
{
var observer = new StreamRef<TItem>.Observer(Stream, callback);
var observer = Stream.CreateObserver(callback);
return new StreamSubscription<TItem>(Stream, await handle.ResumeAsync(observer, o.Token));
}

async Task<StreamSubscription<TItem>> ResumeBatch(ResumeReceiveBatch o)
{
var observer = new StreamRef<TItem>.BatchObserver(Stream, callback);
var observer = Stream.CreateBatchObserver(callback);
return new StreamSubscription<TItem>(Stream, await handle.ResumeAsync(observer, o.Token));
}
}
Expand Down
4 changes: 2 additions & 2 deletions Tests/Orleankka.Tests/Checks/StreamRefFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public void Equatable_by_path()
{
var path = StreamPath.From("sms", "42");

var ref1 = new StreamRef<string>(path, null);
var ref2 = new StreamRef<string>(path, null);
var ref1 = new StreamRef<string>(path, null, null);
var ref2 = new StreamRef<string>(path, null, null);

Assert.True(ref1 == ref2);
Assert.True(ref1.Equals(ref2));
Expand Down
55 changes: 54 additions & 1 deletion Tests/Orleankka.Tests/Features/Intercepting_requests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,29 @@ public override Task<object> Receive(ActorPath actor, object message, Receive re
}
}

public class TestStreamRefMiddleware : StreamRefMiddleware
{
public override Task Publish<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver)
{
if (message is NextItem<ItemData> nextItem && nextItem.Item.Text == "StreamRefMiddlewarePublishTest")
{
nextItem.Item.Text += " - it works!";
}

return base.Publish(path, message, receiver);
}

public override Task Receive<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver)
{
if (message is StreamItem<ItemData> streamItem && streamItem.Item.Text == "StreamRefMiddlewareSubscribeTest")
{
streamItem.Item.Text += " - it works!";
}

return base.Receive(path, message, receiver);
}
}

[TestFixture]
[RequiresSilo]
public class Tests
Expand Down Expand Up @@ -177,7 +200,7 @@ public async Task Intercepting_stream_messages()

await stream.Publish(new ItemData {Text = "foo"});
await Task.Delay(TimeSpan.FromMilliseconds(10));

var received = await actor.Ask(new GetReceivedFromStream());
Assert.That(received.Count, Is.EqualTo(1));
Assert.That(received[0], Is.EqualTo("foo.intercepted"));
Expand All @@ -190,6 +213,36 @@ public async Task Intercepting_actor_ref()
var result = await actor.Ask<string>(new CheckRef());
Assert.That(result, Is.EqualTo("it works!"));
}

[Test]
public async Task Intercepting_stream_ref_publish()
{
var stream = system.StreamOf<ItemData>("sms", "test-stream-ref-publish-interception");

var received = new List<ItemData>();
await stream.Subscribe((item, _) => received.Add(item));

await stream.Publish(new ItemData { Text = "StreamRefMiddlewarePublishTest" });
await Task.Delay(TimeSpan.FromMilliseconds(10));

Assert.That(received.Count, Is.EqualTo(1));
Assert.That(received[0].Text, Is.EqualTo("StreamRefMiddlewarePublishTest - it works!"));
}

[Test]
public async Task Intercepting_stream_ref_subscribe()
{
var stream = system.StreamOf<ItemData>("sms", "test-stream-ref-subscribe-interception");

var received = new List<ItemData>();
await stream.Subscribe((item, _) => received.Add(item));

await stream.Publish(new ItemData { Text = "StreamRefMiddlewareSubscribeTest" });
await Task.Delay(TimeSpan.FromMilliseconds(10));

Assert.That(received.Count, Is.EqualTo(1));
Assert.That(received[0].Text, Is.EqualTo("StreamRefMiddlewareSubscribeTest - it works!"));
}
}
}
}
1 change: 1 addition & 0 deletions Tests/Orleankka.Tests/Testing/TestActions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public override void BeforeTest(ITest test)

services.AddSingleton<IActorRefMiddleware>(s => new TestActorRefMiddleware());
services.AddSingleton<IActorMiddleware>(s => new TestActorMiddleware());
services.AddSingleton<IStreamRefMiddleware>(s => new TestStreamRefMiddleware());
})
.ConfigureApplicationParts(x => x
.AddApplicationPart(GetType().Assembly)
Expand Down

0 comments on commit f765897

Please sign in to comment.