Skip to content

Commit

Permalink
fix memory leak: undisposed intersected lifetime
Browse files Browse the repository at this point in the history
get rid of lifetime for IRdWireableDispatchHelper::dispatch to avoid mixing lifetimes
  • Loading branch information
Iliya-usov committed Oct 19, 2023
1 parent 3cdf2b8 commit 221bb5a
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class MessageBroker(queueMessages: Boolean = false) : IPrintable {
private val messageContext: ProtocolContexts.MessageContext
) : IRdWireableDispatchHelper {

override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) {
doDispatch(lifetime.intersect(this.lifetime), scheduler ?: protocol.scheduler, action)
override fun dispatch(scheduler: IScheduler?, action: () -> Unit) {
doDispatch(lifetime, scheduler ?: protocol.scheduler, action)
}

private fun doDispatch(lifetime: Lifetime, scheduler: IScheduler, action: () -> Unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ interface IRdWireableDispatchHelper {
val rdId: RdId
val lifetime: Lifetime

fun dispatch(lifetime: Lifetime = this.lifetime, scheduler: IScheduler? = null, action: () -> Unit)
fun dispatch(scheduler: IScheduler? = null, action: () -> Unit) = dispatch(lifetime, scheduler, action)
fun dispatch(scheduler: IScheduler? = null, action: () -> Unit)
}


Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ class AsyncRdMap<K : Any, V : Any> private constructor(
override val lifetime: Lifetime
get() = dispatchHelper.lifetime

override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(lifetime, SynchronousScheduler, action)
override fun dispatch(scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(SynchronousScheduler, action)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ class AsyncRdSet<T : Any> private constructor(
override val lifetime: Lifetime
get() = dispatchHelper.lifetime

override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(lifetime, SynchronousScheduler, action)
override fun dispatch(scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(SynchronousScheduler, action)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class CallSiteWiredRdTask<TReq, TRes>(
} else if (resultFromWire is RdTaskResult.Cancelled)
sendCancellation()

dispatchHelper.dispatch(outerLifetime, wireScheduler) {
dispatchHelper.dispatch(wireScheduler) {
if (!result.setIfEmpty(resultFromWire))
RdReactiveBase.logReceived.trace { "call `${call.location}` (${call.rdid}) response was dropped, task result is: ${result.valueOrNull}" }
}
Expand Down Expand Up @@ -192,7 +192,7 @@ class EndpointWiredRdTask<TReq, TRes>(
RdReactiveBase.logReceived.trace { "received cancellation" }
buffer.readVoid() //nothing just a void value

dispatchHelper.dispatch(lifetime, wireScheduler) {
dispatchHelper.dispatch(wireScheduler) {
val success = result.setIfEmpty(RdTaskResult.Cancelled())
val wireScheduler = call.protocol?.scheduler
if (success || wireScheduler == null)
Expand Down
11 changes: 3 additions & 8 deletions rd-net/RdFramework/Base/IRdBindable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,19 @@ public interface IRdWireableDispatchHelper
RdId RdId { get; }
Lifetime Lifetime { get; }

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action);
public void Dispatch(IScheduler? scheduler, Action action);
}

public static class RdWireableDispatchHelperEx
{
public static void Dispatch(this IRdWireableDispatchHelper helper, Lifetime lifetime, Action action)
{
helper.Dispatch(lifetime, null, action);
}

public static void Dispatch(this IRdWireableDispatchHelper helper, IScheduler? scheduler, Action action)
{
helper.Dispatch(helper.Lifetime, scheduler, action);
helper.Dispatch(scheduler, action);
}

public static void Dispatch(this IRdWireableDispatchHelper helper, Action action)
{
helper.Dispatch(helper.Lifetime, null, action);
helper.Dispatch(null, action);
}
}

Expand Down
2 changes: 1 addition & 1 deletion rd-net/RdFramework/Impl/AsyncRdMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public DelegatingDispatchHelper(IRdWireableDispatchHelper dispatchHelper)
myDispatchHelper = dispatchHelper;
}

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action)
public void Dispatch(IScheduler? scheduler, Action action)
{
myDispatchHelper.Dispatch(SynchronousScheduler.Instance, action);
}
Expand Down
2 changes: 1 addition & 1 deletion rd-net/RdFramework/Impl/AsyncRdSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public DelegatingDispatchHelper(IRdWireableDispatchHelper dispatchHelper)
myDispatchHelper = dispatchHelper;
}

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action)
public void Dispatch(IScheduler? scheduler, Action action)
{
myDispatchHelper.Dispatch(SynchronousScheduler.Instance, action);
}
Expand Down
4 changes: 2 additions & 2 deletions rd-net/RdFramework/Impl/MessageBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ internal RdWireableDispatchHelper(Lifetime lifetime, ILog log, IRdWireable wirea
myMessageContext = messageContext;
}

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action)
public void Dispatch(IScheduler? scheduler, Action action)
{
DoDispatch(lifetime.Intersect(Lifetime), scheduler ?? myProtocol.Scheduler, action);
DoDispatch(Lifetime, scheduler ?? myProtocol.Scheduler, action);
}

private void DoDispatch(Lifetime lifetime, IScheduler scheduler, Action action)
Expand Down
4 changes: 2 additions & 2 deletions rd-net/RdFramework/Tasks/WiredRdTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
SendCancellation();


dispatchHelper.Dispatch(myOuterLifetime, WireScheduler, () =>
dispatchHelper.Dispatch(WireScheduler, () =>
{
if (!ResultInternal.SetIfEmpty(taskResult))
Trace(RdReactiveBase.ourLogReceived, "response from wire was rejected because task already has result");
Expand Down Expand Up @@ -203,7 +203,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
Trace(RdReactiveBase.ourLogReceived, "received cancellation");
reader.ReadVoid(); //nothing just a void value

dispatchHelper.Dispatch(Lifetime, WireScheduler, () =>
dispatchHelper.Dispatch(WireScheduler, () =>
{
var success = ResultInternal.SetIfEmpty(RdTaskResult<TRes>.Cancelled());
var protocolScheduler = myCall.TryGetProto()?.Scheduler;
Expand Down

0 comments on commit 221bb5a

Please sign in to comment.