Skip to content

Commit

Permalink
Fix TakeLast causes runtime exception #242
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Aug 13, 2024
1 parent 2798494 commit ed773af
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 18 deletions.
24 changes: 13 additions & 11 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
using System.Xml.Serialization;


var a = new Subject<int>();
var b = new Subject<int>();

a.Zip(b, (x, y) => (x, y)).Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("complete"));
var current = ObservableSystem.GetUnhandledExceptionHandler();


a.OnNext(1);
b.OnNext(2);
a.OnNext(3);

a.OnCompleted();
b.OnNext(4);
var status = Observable.Interval(TimeSpan.FromMilliseconds(100)).Index();
var doSomething = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5);

status.TakeUntil(doSomething.TakeLast(1)).Subscribe(_ => { }, ex =>
{
Console.WriteLine("E" + ex);
}, r =>
{
Console.WriteLine("R" + r);
});

b.OnNext(5);
b.OnNext(6);
b.OnNext(7);

Console.ReadLine()
;
39 changes: 33 additions & 6 deletions src/R3/Operators/TakeLast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,20 @@ protected override IDisposable SubscribeCore(Observer<T> observer)
sealed class _TakeLast(Observer<T> observer, int count) : Observer<T>, IDisposable
{
Queue<T> queue = new Queue<T>(count);
bool takeCompleted = false;

protected override void OnNextCore(T value)
{
if (queue.Count == count && queue.Count != 0)
lock (queue)
{
queue.Dequeue();
if (takeCompleted) return;

if (queue.Count == count && queue.Count != 0)
{
queue.Dequeue();
}
queue.Enqueue(value);
}
queue.Enqueue(value);
}

protected override void OnErrorResumeCore(Exception error)
Expand All @@ -66,16 +72,26 @@ protected override void OnCompletedCore(Result result)
return;
}

foreach (var item in queue)
lock (queue)
{
observer.OnNext(item);
takeCompleted = true;

foreach (var item in queue)
{
observer.OnNext(item);
if (IsDisposed) return; // sometimes called Clear during iterating
}
}

observer.OnCompleted();
}

protected override void DisposeCore()
{
queue.Clear();
lock (queue)
{
queue.Clear();
}
}
}
}
Expand All @@ -94,6 +110,7 @@ sealed class _TakeLastTime : Observer<T>, IDisposable
readonly Queue<(long timestamp, T value)> queue = new();
readonly TimeSpan duration;
readonly TimeProvider timeProvider;
bool takeCompleted = false;

public _TakeLastTime(Observer<T> observer, TimeSpan duration, TimeProvider timeProvider)
{
Expand All @@ -106,6 +123,7 @@ protected override void OnNextCore(T value)
{
lock (gate)
{
if(takeCompleted) return;
var current = timeProvider.GetTimestamp();
queue.Enqueue((current, value));
Trim(current);
Expand All @@ -124,6 +142,8 @@ protected override void OnCompletedCore(Result result)
{
lock (gate)
{
takeCompleted = true;

if (result.IsFailure)
{
observer.OnCompleted(result);
Expand All @@ -134,6 +154,7 @@ protected override void OnCompletedCore(Result result)
foreach (var item in queue)
{
observer.OnNext(item.value);
if (IsDisposed) return; // sometimes called Clear during iterating
}
observer.OnCompleted();
}
Expand Down Expand Up @@ -171,6 +192,7 @@ sealed class _TakeLastFrame : Observer<T>, IDisposable
readonly Queue<(long frameCount, T value)> queue = new();
readonly int frameCount;
readonly FrameProvider frameProvider;
bool takeCompleted = false;

public _TakeLastFrame(Observer<T> observer, int frameCount, FrameProvider frameProvider)
{
Expand All @@ -183,6 +205,8 @@ protected override void OnNextCore(T value)
{
lock (gate)
{
if (takeCompleted) return;

var current = frameProvider.GetFrameCount();
queue.Enqueue((current, value));
Trim(current);
Expand All @@ -201,6 +225,8 @@ protected override void OnCompletedCore(Result result)
{
lock (gate)
{
takeCompleted = true;

if (result.IsFailure)
{
observer.OnCompleted(result);
Expand All @@ -211,6 +237,7 @@ protected override void OnCompletedCore(Result result)
foreach (var item in queue)
{
observer.OnNext(item.value);
if (IsDisposed) return; // sometimes called Clear during iterating
}
observer.OnCompleted();
}
Expand Down
31 changes: 30 additions & 1 deletion tests/R3.Tests/OperatorTests/TakeLastTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace R3.Tests.OperatorTests;

public class TakeLastTest
public class TakeLastTest(ITestOutputHelper helper)
{
[Fact]
public async Task Take()
Expand Down Expand Up @@ -59,4 +59,33 @@ public void TakeFrame2()
list.AssertEqual([3, 4, 5]);
list.AssertIsCompleted();
}

[Fact]
public async Task DisposeQueue()
{
var defaultHandler = ObservableSystem.GetUnhandledExceptionHandler();
Exception? exception = null;
ObservableSystem.RegisterUnhandledExceptionHandler(ex =>
{
exception = ex;
});
try
{
var status = Observable.Interval(TimeSpan.FromMilliseconds(100)).Index();
var doSomething = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5);

var end = new TaskCompletionSource();
status.TakeUntil(doSomething.TakeLast(1)).Subscribe(_ => end.TrySetResult());

await end.Task;
await Task.Delay(TimeSpan.FromMilliseconds(500));

exception!.Should().BeNull();
// helper.WriteLine(exception!.Message);
}
finally
{
ObservableSystem.RegisterUnhandledExceptionHandler(defaultHandler);
}
}
}

0 comments on commit ed773af

Please sign in to comment.