Skip to content

Commit

Permalink
Add Select/Where/SubscribeAwait(int maxConcurrent) overload
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 31, 2024
1 parent a1d7d7d commit b79b07f
Show file tree
Hide file tree
Showing 10 changed files with 694 additions and 35 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,7 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **CountAsync**(this `Observable<T>` source, `Func<T, Boolean>` predicate, `CancellationToken` cancellationToken = default) | `Task<Int32>` |
| **Debounce**(this `Observable<T>` source, `TimeSpan` timeSpan) | `Observable<T>` |
| **Debounce**(this `Observable<T>` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable<T>` |
| **Debounce**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` throttleDurationSelector, `Boolean` configureAwait = False) | `Observable<T>` |
| **DebounceFrame**(this `Observable<T>` source, `Int32` frameCount) | `Observable<T>` |
| **DebounceFrame**(this `Observable<T>` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable<T>` |
| **DefaultIfEmpty**(this `Observable<T>` source) | `Observable<T>` |
Expand Down Expand Up @@ -1436,7 +1437,7 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **Select**(this `Observable<T>` source, `Func<T, Int32, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, TState, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, TResult>` selector) | `Observable<TResult>` |
| **SelectAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<TResult>>` selector, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `Observable<TResult>` |
| **SelectAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<TResult>>` selector, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TResult>>` selector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TCollection>>` collectionSelector, `Func<TSource, TCollection, TResult>` resultSelector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Int32, Observable<TResult>>` selector) | `Observable<TResult>` |
Expand All @@ -1463,9 +1464,9 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **SkipUntil**(this `Observable<T>` source, `Task` task) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Boolean>` predicate) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Exception>` onErrorResume, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Exception>` onErrorResume, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeOn**(this `Observable<T>` source, `SynchronizationContext` synchronizationContext) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `TimeProvider` timeProvider) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `FrameProvider` frameProvider) | `Observable<T>` |
Expand Down Expand Up @@ -1503,10 +1504,14 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **TakeWhile**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **ThrottleFirst**(this `Observable<T>` source, `TimeSpan` timeSpan) | `Observable<T>` |
| **ThrottleFirst**(this `Observable<T>` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable<T>` |
| **ThrottleFirst**(this `Observable<T>` source, `Observable<TSample>` sampler) | `Observable<T>` |
| **ThrottleFirst**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` sampler, `Boolean` configureAwait = False) | `Observable<T>` |
| **ThrottleFirstFrame**(this `Observable<T>` source, `Int32` frameCount) | `Observable<T>` |
| **ThrottleFirstFrame**(this `Observable<T>` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable<T>` |
| **ThrottleLast**(this `Observable<T>` source, `TimeSpan` timeSpan) | `Observable<T>` |
| **ThrottleLast**(this `Observable<T>` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable<T>` |
| **ThrottleLast**(this `Observable<T>` source, `Observable<TSample>` sampler) | `Observable<T>` |
| **ThrottleLast**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` sampler, `Boolean` configureAwait = False) | `Observable<T>` |
| **ThrottleLastFrame**(this `Observable<T>` source, `Int32` frameCount) | `Observable<T>` |
| **ThrottleLastFrame**(this `Observable<T>` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable<T>` |
| **Timeout**(this `Observable<T>` source, `TimeSpan` dueTime) | `Observable<T>` |
Expand All @@ -1533,7 +1538,7 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **Where**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, TState, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, Boolean>` predicate) | `Observable<T>` |
| **WhereAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<Boolean>>` predicate, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `Observable<T>` |
| **WhereAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<Boolean>>` predicate, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `Observable<T>` |
| **WithLatestFrom**(this `Observable<TFirst>` first, `Observable<TSecond>` second, `Func<TFirst, TSecond, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Func<T1, T2, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Observable<T3>` source3, `Func<T1, T2, T3, TResult>` resultSelector) | `Observable<TResult>` |
Expand Down
15 changes: 10 additions & 5 deletions docs/reference_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
| **CountAsync**(this `Observable<T>` source, `Func<T, Boolean>` predicate, `CancellationToken` cancellationToken = default) | `Task<Int32>` |
| **Debounce**(this `Observable<T>` source, `TimeSpan` timeSpan) | `Observable<T>` |
| **Debounce**(this `Observable<T>` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable<T>` |
| **Debounce**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` throttleDurationSelector, `Boolean` configureAwait = False) | `Observable<T>` |
| **DebounceFrame**(this `Observable<T>` source, `Int32` frameCount) | `Observable<T>` |
| **DebounceFrame**(this `Observable<T>` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable<T>` |
| **DefaultIfEmpty**(this `Observable<T>` source) | `Observable<T>` |
Expand Down Expand Up @@ -150,7 +151,7 @@
| **Select**(this `Observable<T>` source, `Func<T, Int32, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, TState, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, TResult>` selector) | `Observable<TResult>` |
| **SelectAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<TResult>>` selector, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `Observable<TResult>` |
| **SelectAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<TResult>>` selector, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TResult>>` selector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TCollection>>` collectionSelector, `Func<TSource, TCollection, TResult>` resultSelector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Int32, Observable<TResult>>` selector) | `Observable<TResult>` |
Expand All @@ -177,9 +178,9 @@
| **SkipUntil**(this `Observable<T>` source, `Task` task) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Boolean>` predicate) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Exception>` onErrorResume, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Exception>` onErrorResume, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeOn**(this `Observable<T>` source, `SynchronizationContext` synchronizationContext) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `TimeProvider` timeProvider) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `FrameProvider` frameProvider) | `Observable<T>` |
Expand Down Expand Up @@ -217,10 +218,14 @@
| **TakeWhile**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **ThrottleFirst**(this `Observable<T>` source, `TimeSpan` timeSpan) | `Observable<T>` |
| **ThrottleFirst**(this `Observable<T>` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable<T>` |
| **ThrottleFirst**(this `Observable<T>` source, `Observable<TSample>` sampler) | `Observable<T>` |
| **ThrottleFirst**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` sampler, `Boolean` configureAwait = False) | `Observable<T>` |
| **ThrottleFirstFrame**(this `Observable<T>` source, `Int32` frameCount) | `Observable<T>` |
| **ThrottleFirstFrame**(this `Observable<T>` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable<T>` |
| **ThrottleLast**(this `Observable<T>` source, `TimeSpan` timeSpan) | `Observable<T>` |
| **ThrottleLast**(this `Observable<T>` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable<T>` |
| **ThrottleLast**(this `Observable<T>` source, `Observable<TSample>` sampler) | `Observable<T>` |
| **ThrottleLast**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` sampler, `Boolean` configureAwait = False) | `Observable<T>` |
| **ThrottleLastFrame**(this `Observable<T>` source, `Int32` frameCount) | `Observable<T>` |
| **ThrottleLastFrame**(this `Observable<T>` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable<T>` |
| **Timeout**(this `Observable<T>` source, `TimeSpan` dueTime) | `Observable<T>` |
Expand All @@ -247,7 +252,7 @@
| **Where**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, TState, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, Boolean>` predicate) | `Observable<T>` |
| **WhereAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<Boolean>>` predicate, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False) | `Observable<T>` |
| **WhereAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<Boolean>>` predicate, `AwaitOperation` awaitOperations = Sequential, `Boolean` configureAwait = False, `Int32` maxConcurrent = -1) | `Observable<T>` |
| **WithLatestFrom**(this `Observable<TFirst>` first, `Observable<TSecond>` second, `Func<TFirst, TSecond, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Func<T1, T2, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Observable<T3>` source3, `Func<T1, T2, T3, TResult>` resultSelector) | `Observable<TResult>` |
Expand Down
2 changes: 2 additions & 0 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

Console.WriteLine("hello");

new ParallelOptions { MaxDegreeOfParallelism = -10 };

// System.Reactive.Linq.Observable.Sample(

//JsonSerializerOptions.Default.TypeInfoResolver
Expand Down
Loading

0 comments on commit b79b07f

Please sign in to comment.