Skip to content

Commit

Permalink
ZipLatest raise OnCompleted when can not produce values anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jul 26, 2024
1 parent c11a7ba commit f055c0a
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 97 deletions.
32 changes: 28 additions & 4 deletions src/R3/Factories/ZipLatest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ sealed class _CombineLatest : IDisposable
readonly Observer<T[]> observer;
readonly Observable<T>[] sources;
readonly CombineLatestObserver[] observers;
int completedCount;

public _CombineLatest(Observer<T[]> observer, IEnumerable<Observable<T>> sources)
{
Expand Down Expand Up @@ -66,9 +65,14 @@ public IDisposable Run()

public void TryPublishOnNext()
{
var hasCompletedObserver = false;
foreach (var item in observers)
{
if (!item.HasValue) return;
if (item.IsCompleted)
{
hasCompletedObserver = true;
}
}

var values = new T[observers.Length];
Expand All @@ -77,9 +81,15 @@ public void TryPublishOnNext()
values[i] = observers[i].GetValue();
}
observer.OnNext(values);

if (hasCompletedObserver)
{
observer.OnCompleted();
Dispose();
}
}

public void TryPublishOnCompleted(Result result)
public void TryPublishOnCompleted(Result result, bool empty)
{
if (result.IsFailure)
{
Expand All @@ -88,14 +98,23 @@ public void TryPublishOnCompleted(Result result)
}
else
{
if (Interlocked.Increment(ref completedCount) == sources.Length)
if (empty || AllObserverIsCompleted())
{
observer.OnCompleted();
Dispose();
}
}
}

bool AllObserverIsCompleted()
{
foreach (var item in observers)
{
if (!item.IsCompleted) return false;
}
return true;
}

public void Dispose()
{
foreach (var observer in observers)
Expand All @@ -108,6 +127,7 @@ sealed class CombineLatestObserver(_CombineLatest parent) : Observer<T>
{
T? value;
public bool HasValue { get; private set; }
public bool IsCompleted { get; private set; }

public T GetValue()
{
Expand All @@ -134,7 +154,11 @@ protected override void OnErrorResumeCore(Exception error)

protected override void OnCompletedCore(Result result)
{
parent.TryPublishOnCompleted(result);
lock (parent.observer)
{
IsCompleted = true;
parent.TryPublishOnCompleted(result, !HasValue);
}
}
}
}
Expand Down
28 changes: 14 additions & 14 deletions src/R3/Operators/Zip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -430,7 +430,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -572,7 +572,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -722,7 +722,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -880,7 +880,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -1046,7 +1046,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -1220,7 +1220,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -1402,7 +1402,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -1592,7 +1592,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -1790,7 +1790,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -1996,7 +1996,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted && observer12.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted && observer12.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -2210,7 +2210,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted && observer12.IsCompleted && observer13.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted && observer12.IsCompleted && observer13.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -2432,7 +2432,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted && observer12.IsCompleted && observer13.IsCompleted && observer14.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted && observer12.IsCompleted && observer13.IsCompleted && observer14.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down Expand Up @@ -2662,7 +2662,7 @@ public void TryPublishOnCompleted(Result result, bool empty)
}
else
{
if (empty || observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted && observer12.IsCompleted && observer13.IsCompleted && observer14.IsCompleted && observer15.IsCompleted)
if (empty || (observer1.IsCompleted && observer2.IsCompleted && observer3.IsCompleted && observer4.IsCompleted && observer5.IsCompleted && observer6.IsCompleted && observer7.IsCompleted && observer8.IsCompleted && observer9.IsCompleted && observer10.IsCompleted && observer11.IsCompleted && observer12.IsCompleted && observer13.IsCompleted && observer14.IsCompleted && observer15.IsCompleted))
{
observer.OnCompleted();
Dispose();
Expand Down
2 changes: 1 addition & 1 deletion src/R3/Operators/Zip.tt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ internal sealed class Zip<<#= generateT(i) #>, TResult>(
}
else
{
if (empty || <#= generateIsCompleted(i) #>)
if (empty || (<#= generateIsCompleted(i) #>))
{
observer.OnCompleted();
Dispose();
Expand Down
Loading

0 comments on commit f055c0a

Please sign in to comment.