diff --git a/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/CancelTests.cs b/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/CancelTests.cs index ea6ff894a..5ac4733cb 100644 --- a/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/CancelTests.cs +++ b/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/CancelTests.cs @@ -241,14 +241,6 @@ public void Cancel(InterruptDetails details) InterruptCalled = true; } - public bool IsInvocationCompleted - { - get - { - return false; - } - } - public bool InterruptCalled { get; set; } } } diff --git a/src/netstandard/WampSharp/WAMP2/V2/Client/Rpc/WampCallee.cs b/src/netstandard/WampSharp/WAMP2/V2/Client/Rpc/WampCallee.cs index bd9328b85..c53270853 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Client/Rpc/WampCallee.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Client/Rpc/WampCallee.cs @@ -2,6 +2,8 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Reactive; +using System.Reactive.Subjects; using System.Threading.Tasks; using SystemEx; using WampSharp.Core.Listener; @@ -159,9 +161,9 @@ private IWampRpcOperation TryGetOperation(long registrationId) return null; } - private IWampRawRpcOperationRouterCallback GetCallback(long requestId) + private IWampRawRpcOperationRouterCallback GetCallback(long requestId, ISubject onCompleted) { - return new ServerProxyCallback(mProxy, requestId, this); + return new ServerProxyCallback(mProxy, requestId, this, onCompleted); } public void Invocation(long requestId, long registrationId, InvocationDetails details) @@ -200,7 +202,8 @@ private void InvocationPattern(long requestId, long registrationId, InvocationDe if (operation != null) { - IWampRawRpcOperationRouterCallback callback = GetCallback(requestId); + ReplaySubject onOperationDone = new ReplaySubject(); + IWampRawRpcOperationRouterCallback callback = GetCallback(requestId, onOperationDone); InvocationDetails modifiedDetails = new InvocationDetails(details) { @@ -215,16 +218,14 @@ private void InvocationPattern(long requestId, long registrationId, InvocationDe lock (mLock) { - if (!invocation.IsInvocationCompleted) - { - mRegistrationsToInvocations.Add(registrationId, requestId); - } + mRegistrationsToInvocations.Add(registrationId, requestId); } - if (invocation.IsInvocationCompleted) - { - CleanupInvocationData(requestId); - } + onOperationDone.Subscribe(x => + { + this.CleanupInvocationData(requestId); + onOperationDone.Dispose(); + }); } } } @@ -361,12 +362,15 @@ private class ServerProxyCallback : IWampRawRpcOperationRouterCallback { private readonly IWampServerProxy mProxy; private readonly WampCallee mParent; + private readonly ISubject mOnCompleted; - public ServerProxyCallback(IWampServerProxy proxy, long requestId, WampCallee parent) + public ServerProxyCallback(IWampServerProxy proxy, long requestId, WampCallee parent, + ISubject onCompleted) { mProxy = proxy; RequestId = requestId; mParent = parent; + mOnCompleted = onCompleted; } public long RequestId { get; } @@ -376,6 +380,7 @@ private void Cleanup(YieldOptions yieldOptions = null) if (yieldOptions?.Progress != true) { mParent.CleanupInvocationData(RequestId); + mOnCompleted.OnNext(Unit.Default); } } diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/CancellationTokenSourceInvocation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/CancellationTokenSourceInvocation.cs index d78508cb9..2dc58c5df 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/CancellationTokenSourceInvocation.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/CancellationTokenSourceInvocation.cs @@ -20,7 +20,5 @@ public void Cancel(InterruptDetails details) { mCancellationTokenSource.Cancel(); } - - public bool IsInvocationCompleted => mTask.IsCompleted; } } \ No newline at end of file diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Dealer/WampCalleeRpcInvocation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Dealer/WampCalleeRpcInvocation.cs index 669cd4c21..4d57bd1b3 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Dealer/WampCalleeRpcInvocation.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Dealer/WampCalleeRpcInvocation.cs @@ -17,13 +17,5 @@ public void Cancel(InterruptDetails details) { Callee.Interrupt(RequestId, details); } - - public bool IsInvocationCompleted - { - get - { - return false; - } - } } } \ No newline at end of file diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Interfaces/IWampCancellableInvocation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Interfaces/IWampCancellableInvocation.cs index 0a4e9cc64..59ac4df0f 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Interfaces/IWampCancellableInvocation.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Interfaces/IWampCancellableInvocation.cs @@ -5,7 +5,5 @@ namespace WampSharp.V2.Rpc public interface IWampCancellableInvocation { void Cancel(InterruptDetails details); - - bool IsInvocationCompleted { get; } } } \ No newline at end of file