From ed773af7c5738f6e4bd8cdcffb681fad84a0021c Mon Sep 17 00:00:00 2001 From: neuecc Date: Tue, 13 Aug 2024 16:44:21 +0900 Subject: [PATCH] Fix TakeLast causes runtime exception #242 --- sandbox/ConsoleApp1/Program.cs | 24 ++++++------ src/R3/Operators/TakeLast.cs | 39 +++++++++++++++++--- tests/R3.Tests/OperatorTests/TakeLastTest.cs | 31 +++++++++++++++- 3 files changed, 76 insertions(+), 18 deletions(-) diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 2debbdce..cbbc26c3 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -3,20 +3,22 @@ using System.Xml.Serialization; -var a = new Subject(); -var b = new Subject(); -a.Zip(b, (x, y) => (x, y)).Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("complete")); +var current = ObservableSystem.GetUnhandledExceptionHandler(); -a.OnNext(1); -b.OnNext(2); -a.OnNext(3); -a.OnCompleted(); -b.OnNext(4); +var status = Observable.Interval(TimeSpan.FromMilliseconds(100)).Index(); +var doSomething = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5); +status.TakeUntil(doSomething.TakeLast(1)).Subscribe(_ => { }, ex => +{ + Console.WriteLine("E" + ex); +}, r => +{ + Console.WriteLine("R" + r); +}); -b.OnNext(5); -b.OnNext(6); -b.OnNext(7); + +Console.ReadLine() +; diff --git a/src/R3/Operators/TakeLast.cs b/src/R3/Operators/TakeLast.cs index 2206b255..0f5c5f57 100644 --- a/src/R3/Operators/TakeLast.cs +++ b/src/R3/Operators/TakeLast.cs @@ -43,14 +43,20 @@ protected override IDisposable SubscribeCore(Observer observer) sealed class _TakeLast(Observer observer, int count) : Observer, IDisposable { Queue queue = new Queue(count); + bool takeCompleted = false; protected override void OnNextCore(T value) { - if (queue.Count == count && queue.Count != 0) + lock (queue) { - queue.Dequeue(); + if (takeCompleted) return; + + if (queue.Count == count && queue.Count != 0) + { + queue.Dequeue(); + } + queue.Enqueue(value); } - queue.Enqueue(value); } protected override void OnErrorResumeCore(Exception error) @@ -66,16 +72,26 @@ protected override void OnCompletedCore(Result result) return; } - foreach (var item in queue) + lock (queue) { - observer.OnNext(item); + takeCompleted = true; + + foreach (var item in queue) + { + observer.OnNext(item); + if (IsDisposed) return; // sometimes called Clear during iterating + } } + observer.OnCompleted(); } protected override void DisposeCore() { - queue.Clear(); + lock (queue) + { + queue.Clear(); + } } } } @@ -94,6 +110,7 @@ sealed class _TakeLastTime : Observer, IDisposable readonly Queue<(long timestamp, T value)> queue = new(); readonly TimeSpan duration; readonly TimeProvider timeProvider; + bool takeCompleted = false; public _TakeLastTime(Observer observer, TimeSpan duration, TimeProvider timeProvider) { @@ -106,6 +123,7 @@ protected override void OnNextCore(T value) { lock (gate) { + if(takeCompleted) return; var current = timeProvider.GetTimestamp(); queue.Enqueue((current, value)); Trim(current); @@ -124,6 +142,8 @@ protected override void OnCompletedCore(Result result) { lock (gate) { + takeCompleted = true; + if (result.IsFailure) { observer.OnCompleted(result); @@ -134,6 +154,7 @@ protected override void OnCompletedCore(Result result) foreach (var item in queue) { observer.OnNext(item.value); + if (IsDisposed) return; // sometimes called Clear during iterating } observer.OnCompleted(); } @@ -171,6 +192,7 @@ sealed class _TakeLastFrame : Observer, IDisposable readonly Queue<(long frameCount, T value)> queue = new(); readonly int frameCount; readonly FrameProvider frameProvider; + bool takeCompleted = false; public _TakeLastFrame(Observer observer, int frameCount, FrameProvider frameProvider) { @@ -183,6 +205,8 @@ protected override void OnNextCore(T value) { lock (gate) { + if (takeCompleted) return; + var current = frameProvider.GetFrameCount(); queue.Enqueue((current, value)); Trim(current); @@ -201,6 +225,8 @@ protected override void OnCompletedCore(Result result) { lock (gate) { + takeCompleted = true; + if (result.IsFailure) { observer.OnCompleted(result); @@ -211,6 +237,7 @@ protected override void OnCompletedCore(Result result) foreach (var item in queue) { observer.OnNext(item.value); + if (IsDisposed) return; // sometimes called Clear during iterating } observer.OnCompleted(); } diff --git a/tests/R3.Tests/OperatorTests/TakeLastTest.cs b/tests/R3.Tests/OperatorTests/TakeLastTest.cs index 71e72284..6bde6f4c 100644 --- a/tests/R3.Tests/OperatorTests/TakeLastTest.cs +++ b/tests/R3.Tests/OperatorTests/TakeLastTest.cs @@ -2,7 +2,7 @@ namespace R3.Tests.OperatorTests; -public class TakeLastTest +public class TakeLastTest(ITestOutputHelper helper) { [Fact] public async Task Take() @@ -59,4 +59,33 @@ public void TakeFrame2() list.AssertEqual([3, 4, 5]); list.AssertIsCompleted(); } + + [Fact] + public async Task DisposeQueue() + { + var defaultHandler = ObservableSystem.GetUnhandledExceptionHandler(); + Exception? exception = null; + ObservableSystem.RegisterUnhandledExceptionHandler(ex => + { + exception = ex; + }); + try + { + var status = Observable.Interval(TimeSpan.FromMilliseconds(100)).Index(); + var doSomething = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5); + + var end = new TaskCompletionSource(); + status.TakeUntil(doSomething.TakeLast(1)).Subscribe(_ => end.TrySetResult()); + + await end.Task; + await Task.Delay(TimeSpan.FromMilliseconds(500)); + + exception!.Should().BeNull(); + // helper.WriteLine(exception!.Message); + } + finally + { + ObservableSystem.RegisterUnhandledExceptionHandler(defaultHandler); + } + } }