Skip to content

Commit

Permalink
Fixing #342?
Browse files Browse the repository at this point in the history
  • Loading branch information
darkl committed Mar 15, 2023
1 parent 1a5bef8 commit 5455b94
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,6 @@ public void Cancel(InterruptDetails details)
InterruptCalled = true;
}

public bool IsInvocationCompleted
{
get
{
return false;
}
}

public bool InterruptCalled { get; set; }
}
}
Expand Down
29 changes: 17 additions & 12 deletions src/netstandard/WampSharp/WAMP2/V2/Client/Rpc/WampCallee.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using SystemEx;
using WampSharp.Core.Listener;
Expand Down Expand Up @@ -159,9 +161,9 @@ private IWampRpcOperation TryGetOperation(long registrationId)
return null;
}

private IWampRawRpcOperationRouterCallback GetCallback(long requestId)
private IWampRawRpcOperationRouterCallback GetCallback(long requestId, ISubject<Unit> onCompleted)
{
return new ServerProxyCallback(mProxy, requestId, this);
return new ServerProxyCallback(mProxy, requestId, this, onCompleted);
}

public void Invocation(long requestId, long registrationId, InvocationDetails details)
Expand Down Expand Up @@ -200,7 +202,8 @@ private void InvocationPattern(long requestId, long registrationId, InvocationDe

if (operation != null)
{
IWampRawRpcOperationRouterCallback callback = GetCallback(requestId);
ReplaySubject<Unit> onOperationDone = new ReplaySubject<Unit>();
IWampRawRpcOperationRouterCallback callback = GetCallback(requestId, onOperationDone);

InvocationDetails modifiedDetails = new InvocationDetails(details)
{
Expand All @@ -215,16 +218,14 @@ private void InvocationPattern(long requestId, long registrationId, InvocationDe

lock (mLock)
{
if (!invocation.IsInvocationCompleted)
{
mRegistrationsToInvocations.Add(registrationId, requestId);
}
mRegistrationsToInvocations.Add(registrationId, requestId);
}

if (invocation.IsInvocationCompleted)
{
CleanupInvocationData(requestId);
}
onOperationDone.Subscribe(x =>
{
this.CleanupInvocationData(requestId);
onOperationDone.Dispose();
});
}
}
}
Expand Down Expand Up @@ -361,12 +362,15 @@ private class ServerProxyCallback : IWampRawRpcOperationRouterCallback
{
private readonly IWampServerProxy mProxy;
private readonly WampCallee<TMessage> mParent;
private readonly ISubject<Unit> mOnCompleted;

public ServerProxyCallback(IWampServerProxy proxy, long requestId, WampCallee<TMessage> parent)
public ServerProxyCallback(IWampServerProxy proxy, long requestId, WampCallee<TMessage> parent,
ISubject<Unit> onCompleted)
{
mProxy = proxy;
RequestId = requestId;
mParent = parent;
mOnCompleted = onCompleted;
}

public long RequestId { get; }
Expand All @@ -376,6 +380,7 @@ private void Cleanup(YieldOptions yieldOptions = null)
if (yieldOptions?.Progress != true)
{
mParent.CleanupInvocationData(RequestId);
mOnCompleted.OnNext(Unit.Default);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,5 @@ public void Cancel(InterruptDetails details)
{
mCancellationTokenSource.Cancel();
}

public bool IsInvocationCompleted => mTask.IsCompleted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,5 @@ public void Cancel(InterruptDetails details)
{
Callee.Interrupt(RequestId, details);
}

public bool IsInvocationCompleted
{
get
{
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@ namespace WampSharp.V2.Rpc
public interface IWampCancellableInvocation
{
void Cancel(InterruptDetails details);

bool IsInvocationCompleted { get; }
}
}

0 comments on commit 5455b94

Please sign in to comment.