Skip to content

Commit

Permalink
Merge pull request #58 from Cysharp/select-await
Browse files Browse the repository at this point in the history
Add SelectAwait, WhereAwait, SubscribeAwait
  • Loading branch information
neuecc authored Jan 22, 2024
2 parents 7a36cb9 + 8bfc29f commit 0d75fe1
Show file tree
Hide file tree
Showing 18 changed files with 1,388 additions and 130 deletions.
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,38 @@ Interoperability with `IObservable<T>`
* `public static Observable<T> ToObservable<T>(this IObservable<T> source)`
* `public static IObservable<T> AsSystemObservable<T>(this Observable<T> source)`

Interoperability with `async/await`
---
R3 has special integration with `async/await`.

| Name(Parameter) | ReturnType |
| --- | --- |
| **SelectAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<TResult>>` selector, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `Observable<TResult>` |
| **WhereAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<Boolean>>` predicate, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `Observable<T>` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Result>` onCompleted, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Exception>` onErrorResume, `Action<Result>` onCompleted, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `IDisposable` |

```csharp
public enum AwaitOperations
{
Queue,
Drop,
Parallel
}
```

```csharp
// for example...
button.OnClickAsObservable()
.SelectAwait(async (_, ct) =>
{
var req = await UnityWebRequest.Get("https://google.com/").SendWebRequest().WithCancellation(ct);
return req.downloadHandler.text;
}, AwaitOperations.Drop)
.SubscribeToText(text);
```

Concurrency Policy
---
TODO:
Expand Down Expand Up @@ -1350,6 +1382,7 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **Select**(this `Observable<T>` source, `Func<T, Int32, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, TState, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, TResult>` selector) | `Observable<TResult>` |
| **SelectAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<TResult>>` selector, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TResult>>` selector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TCollection>>` collectionSelector, `Func<TSource, TCollection, TResult>` resultSelector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Int32, Observable<TResult>>` selector) | `Observable<TResult>` |
Expand All @@ -1376,6 +1409,9 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **SkipUntil**(this `Observable<T>` source, `Task` task) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Boolean>` predicate) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Result>` onCompleted, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Exception>` onErrorResume, `Action<Result>` onCompleted, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `IDisposable` |
| **SubscribeOn**(this `Observable<T>` source, `SynchronizationContext` synchronizationContext) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `TimeProvider` timeProvider) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `FrameProvider` frameProvider) | `Observable<T>` |
Expand Down Expand Up @@ -1443,6 +1479,7 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **Where**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, TState, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, Boolean>` predicate) | `Observable<T>` |
| **WhereAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<Boolean>>` predicate, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `Observable<T>` |
| **WithLatestFrom**(this `Observable<TFirst>` first, `Observable<TSecond>` second, `Func<TFirst, TSecond, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Func<T1, T2, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Observable<T3>` source3, `Func<T1, T2, T3, TResult>` resultSelector) | `Observable<TResult>` |
Expand Down
5 changes: 5 additions & 0 deletions docs/reference_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
| **Select**(this `Observable<T>` source, `Func<T, Int32, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, TState, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, TResult>` selector) | `Observable<TResult>` |
| **SelectAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<TResult>>` selector, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TResult>>` selector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TCollection>>` collectionSelector, `Func<TSource, TCollection, TResult>` resultSelector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Int32, Observable<TResult>>` selector) | `Observable<TResult>` |
Expand All @@ -174,6 +175,9 @@
| **SkipUntil**(this `Observable<T>` source, `Task` task) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Boolean>` predicate) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Result>` onCompleted, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Exception>` onErrorResume, `Action<Result>` onCompleted, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `IDisposable` |
| **SubscribeOn**(this `Observable<T>` source, `SynchronizationContext` synchronizationContext) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `TimeProvider` timeProvider) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `FrameProvider` frameProvider) | `Observable<T>` |
Expand Down Expand Up @@ -241,6 +245,7 @@
| **Where**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, TState, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, Boolean>` predicate) | `Observable<T>` |
| **WhereAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<Boolean>>` predicate, `AwaitOperations` awaitOperations = Queue, `Boolean` configureAwait = True) | `Observable<T>` |
| **WithLatestFrom**(this `Observable<TFirst>` first, `Observable<TSecond>` second, `Func<TFirst, TSecond, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Func<T1, T2, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Observable<T3>` source3, `Func<T1, T2, T3, TResult>` resultSelector) | `Observable<TResult>` |
Expand Down
1 change: 1 addition & 0 deletions sandbox/ConsoleApp1/ConsoleApp1.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Include="System.Reactive.Linq" Version="6.0.0" />
<PackageReference Include="ZLogger" Version="2.0.0" />
</ItemGroup>
Expand Down
93 changes: 9 additions & 84 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
@@ -1,97 +1,22 @@
using R3;
using System.ComponentModel.DataAnnotations;
using System.Reactive.Linq;


Console.WriteLine("hello");
// System.Linq.AsyncEnumerable.Range(1,10)

//Dump.Factory();

var hoge = new Hoge();

hoge.MyProperty1.ErrorsChanged += MyProperty1_ErrorsChanged;
hoge.MyProperty2.ErrorsChanged += MyProperty2_ErrorsChanged;

void MyProperty1_ErrorsChanged(object? sender, System.ComponentModel.DataErrorsChangedEventArgs e)
{
foreach (var item in hoge.MyProperty1.GetErrors(null!))
{
Console.WriteLine(item);
}
}
void MyProperty2_ErrorsChanged(object? sender, System.ComponentModel.DataErrorsChangedEventArgs e)
{
foreach (var item in hoge.MyProperty2.GetErrors(null!))
{
Console.WriteLine(item);
}
}
hoge.MyProperty1.Value = 30;
hoge.MyProperty2.Value = 40;


//ObservableTracker.EnableTracking = true;
//ObservableTracker.EnableStackTrace = true;

//using var factory = LoggerFactory.Create(x =>
//{
// x.SetMinimumLevel(LogLevel.Trace);
// x.AddZLoggerConsole();
//});
//// ObservableSystem.Logger = factory.CreateLogger<ObservableSystem>();
//var logger = factory.CreateLogger<Program>();




//var sw = Stopwatch.StartNew();
//var subject1 = new System.Reactive.Subjects.Subject<int>();
//var subject2 = new System.Reactive.Subjects.Subject<int>();
////subject1.WithLatestFrom(subject2.Finally(() => Console.WriteLine("finally subject2")), (x, y) => (x, y)).Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("end"));

//subject1.Scan((x, y) => x + y).Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("end"));


//subject1.OnNext(1);
//subject1.OnNext(10);
////subject1.OnNext(10);
////subject1.OnNext(100);

//// subject1.SequenceEqual(


//// System.Reactive.Linq.Observable.Switch(

//Dump.Factory();

//public static class Extensions
//{
// public static IDisposable WriteLine<T>(this Observable<T> source)
// {
// return source.Subscribe(x => Console.WriteLine(x), x => Console.WriteLine(x));
// }
//}
//System.Reactive.Linq.Observable.Range(1,10).SelectMany(

//var onClick = new Subject<Unit>();
//var httpClient = new HttpClient();


//class TestDisposable : IDisposable
//onClick.SelectAwait(async x =>
//{
// public int CalledCount = 0;

// public void Dispose()
// {
// CalledCount += 1;
// }
//}


public class Hoge
{
[Range(1, 10)]
public BindableReactiveProperty<int> MyProperty1 { get; set; } = new BindableReactiveProperty<int>().EnableValidation<Hoge>();

[Range(1, 10)]
public BindableReactiveProperty<int> MyProperty2 { get; set; }

public Hoge()
{
MyProperty2 = new BindableReactiveProperty<int>().EnableValidation(() => MyProperty2);
}
}
//});
22 changes: 17 additions & 5 deletions src/R3.Unity/Assets/Scenes/NewBehaviourScript.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using R3;
using System;
using UnityEngine;
using UnityEngine.Networking;
using UnityEngine.UI;
using Cysharp.Threading.Tasks;

public class NewBehaviourScript : MonoBehaviour
{
Expand All @@ -26,14 +29,23 @@ public class NewBehaviourScript : MonoBehaviour
public SerializableReactiveProperty<FruitEnum> rpEnum;
public SerializableReactiveProperty<FruitFlagsEnum> rpFlagsEnum;


Button button;
Text text;

void Start()
{
Observable.Interval(TimeSpan.FromSeconds(1)).Timeout(TimeSpan.FromSeconds(5))
.Subscribe(x =>
{
Debug.Log("Time:" + Time.time);
});
//button.OnClickAsObservable()
// .SelectAwait(async (_, ct) =>
// {
// var req = await UnityWebRequest.Get("https://google.com/").SendWebRequest().WithCancellation(ct);
// return req.downloadHandler.text;
// }, AwaitOperations.Drop)
// .SubscribeToText(text);
}



}


Expand Down
1 change: 1 addition & 0 deletions src/R3.Unity/Packages/manifest.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"dependencies": {
"com.cysharp.r3.internal": "file:../../R3/bin/Debug/netstandard2.0",
"com.cysharp.unitask": "https://github.com/Cysharp/UniTask.git?path=src/UniTask/Assets/Plugins/UniTask",
"com.github-glitchenzo.nugetforunity": "https://github.com/GlitchEnzo/NuGetForUnity.git?path=/src/NuGetForUnity",
"com.unity.ide.visualstudio": "2.0.22",
"com.unity.ugui": "1.0.0",
Expand Down
7 changes: 7 additions & 0 deletions src/R3.Unity/Packages/packages-lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
"source": "local",
"dependencies": {}
},
"com.cysharp.unitask": {
"version": "https://github.com/Cysharp/UniTask.git?path=src/UniTask/Assets/Plugins/UniTask",
"depth": 0,
"source": "git",
"dependencies": {},
"hash": "66de0d3a58b256b6e0194f285cc530fd0c08407b"
},
"com.github-glitchenzo.nugetforunity": {
"version": "https://github.com/GlitchEnzo/NuGetForUnity.git?path=/src/NuGetForUnity",
"depth": 0,
Expand Down
8 changes: 8 additions & 0 deletions src/R3/AwaitOperations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace R3;

public enum AwaitOperations
{
Queue,
Drop,
Parallel
}
18 changes: 18 additions & 0 deletions src/R3/Internal/ChannelUtility.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.Threading.Channels;

namespace R3.Internal;

internal static class ChannelUtility
{
static readonly UnboundedChannelOptions options = new UnboundedChannelOptions
{
SingleWriter = true, // in Rx operator, OnNext gurantees synchronous
SingleReader = true, // almostly uses single reader loop
AllowSynchronousContinuations = true // if false, uses TaskCreationOptions.RunContinuationsAsynchronously so avoid it.
};

internal static Channel<T> CreateSingleReadeWriterUnbounded<T>()
{
return Channel.CreateUnbounded<T>(options);
}
}
6 changes: 5 additions & 1 deletion src/R3/Operators/Select.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
namespace R3;
using System.Security.AccessControl;
using System.Threading;
using System.Threading.Channels;

namespace R3;

public static partial class ObservableExtensions
{
Expand Down
Loading

0 comments on commit 0d75fe1

Please sign in to comment.