Skip to content

Commit

Permalink
R
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Mar 1, 2024
1 parent a6ef207 commit 094b5b6
Showing 1 changed file with 104 additions and 11 deletions.
115 changes: 104 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ In other words, LINQ is not for EveryThing, and we believe that the essence of R

To address the shortcomings of dotnet/reactive, we have made changes to the core interfaces. In recent years, Rx-like frameworks optimized for language features, such as [Kotlin Flow](https://kotlinlang.org/docs/flow.html) and [Swift Combine](https://developer.apple.com/documentation/combine), have been standardized. C# has also evolved significantly, now at C# 12, and we believe there is a need for an Rx that aligns with the latest C#.

Improving performance was also a theme in the reimplementation. For example, this is the result of the terrible performance of IScheudler and the performance difference caused by its removal.
Improving performance was also a theme in the reimplementation. For example, this is the result of the terrible performance of IScheduler and the performance difference caused by its removal.

![image](https://github.com/Cysharp/ZLogger/assets/46207/68a12664-a840-4725-a87c-8fdbb03b4a02)
`Observable.Range(1, 10000).Subscribe()`
Expand All @@ -39,7 +39,7 @@ This library is distributed via NuGet, supporting .NET Standard 2.0, .NET Standa
Some platforms(WPF, Avalonia, Unity, Godot) requires additional step to install. Please see [Platform Supports](#platform-supports) section in below.

R3 code is almostly same as standard Rx. Make the Observable via factory methods(Timer, Interval, FromEvent, Subject, etc...) and chain operator via LINQ methods. Therefore, your knowledge about Rx and documentation on Rx can be almost directly applied. If you are new to Rx, the [ReactiveX](https://reactivex.io/intro.html) website and [Introduction to Rx.NET](https://introtorx.com/) would be useful resources for reference.
R3 code is mostly the same as standard Rx. Make the Observable via factory methods(Timer, Interval, FromEvent, Subject, etc...) and chain operator via LINQ methods. Therefore, your knowledge about Rx and documentation on Rx can be almost directly applied. If you are new to Rx, the [ReactiveX](https://reactivex.io/intro.html) website and [Introduction to Rx.NET](https://introtorx.com/) would be useful resources for reference.

```csharp
using R3;
Expand Down Expand Up @@ -73,15 +73,15 @@ public abstract class Observer<T> : IDisposable
{
public void OnNext(T value);
public void OnErrorResume(Exception error);
public void OnCompleted(Result result); // Result is () | Exception
public void OnCompleted(Result result); // Result is (Success | Failure)
}
```

The biggest difference is that in normal Rx, when an exception occurs in the pipeline, it flows to `OnError` and the subscription is unsubscribed, but in R3, it flows to `OnErrorResume` and the subscription is not unsubscribed.

I consider the automatic unsubscription by OnError to be a bad design for event handling. It's very difficult and risky to resolve it within an operator like Retry, and it also led to poor performance (there are many questions and complex answers about stopping and resubscribing all over the world). Also, converting OnErrorResume to OnError(OnCompleted(Result.Failure)) is easy and does not degrade performance, but the reverse is impossible. Therefore, the design was changed to not stop by default and give users the choice to stop.

Since the original Rx contract was `OnError | OnCompleted`, it was changed to `OnCompleted(Result result)` to consolidate into one method. Result is a readonly struct with two states: `Failure(Exception) | Success()`.
Since the original Rx contract was `OnError | OnCompleted`, it was changed to `OnCompleted(Result result)` to consolidate into one method. Result is a readonly struct with two states: `Success() | Failure(Exception)`.

The reason for changing to an abstract class instead of an interface is that Rx has implicit complex contracts that interfaces do not guarantee. By making it an abstract class, we fully controlled the behavior of Subscribe, OnNext, and Dispose. This made it possible to manage the list of all subscriptions and prevent subscription leaks.

Expand Down Expand Up @@ -131,7 +131,7 @@ There are also several operators unique to frame-based processing.
Observable.EveryUpdate().Subscribe(x => Console.WriteLine(x));

// take value until next frame
_eventSoure.TakeUntil(Obserable.NextFrame()).Subscribe();
eventSoure.TakeUntil(Observable.NextFrame()).Subscribe();

// polling value changed
Observable.EveryValueChanged(this, x => x.Width).Subscribe(x => WidthText.Text = x.ToString());
Expand Down Expand Up @@ -345,13 +345,13 @@ public partial class MainWindow : Window

Additionally, there are other utilities for Disposables as follows.

```
```csharp
Disposable.Create(Action);
Disposable.Dispose(...);
SingleAssignmentDisposable
SingleAssignmentDisposableCore // struct
SerialDisposable
SerialDisposableCore// struct
SerialDisposableCore // struct
```

Subscription Management
Expand Down Expand Up @@ -387,7 +387,7 @@ ObservableTracker.ForEachActiveTask(x =>
});
```

```
```csharp
TrackingState { TrackingId = 1, FormattedType = Timer._Timer, AddTime = 2024/01/09 4:11:39, StackTrace =... }
TrackingState { TrackingId = 2, FormattedType = Where`1._Where<Unit>, AddTime = 2024/01/09 4:11:39, StackTrace =... }
TrackingState { TrackingId = 3, FormattedType = Take`1._Take<Unit>, AddTime = 2024/01/09 4:11:39, StackTrace =... }
Expand Down Expand Up @@ -561,21 +561,87 @@ button.OnClickAsObservable()

`cancelOnCompleted` lets you choose whether to cancel the ongoing asynchronous method (by setting CancellationToken to Cancel) when the `OnCompleted` event is received. The default is true, meaning it will be cancelled. If set to false, it waits for the completion of the asynchronous method before calling the subsequent `OnCompleted` (potentially after issuing OnNext, depending on the case).

Additionally, the following time-related filtering methods can also accept asynchronous methods.
Additionally, the following time-related filtering/aggregating methods can also accept asynchronous methods.

| Name | ReturnType |
| --- | --- |
| **Debounce**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` throttleDurationSelector, `Boolean` configureAwait = true) | `Observable<T>` |
| **ThrottleFirst**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` sampler, `Boolean` configureAwait = true) | `Observable<T>` |
| **ThrottleLast**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` sampler, `Boolean` configureAwait = true) | `Observable<T>` |
| **ThrottleFirstLast**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` sampler, `Boolean` configureAwait = true) | `Observable<T>` |
| **Chunk**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` asyncWindow, `Boolean` configureAwait = true) | `Observable<T[]>` |

For example, by using the asynchronous function version of Chunk, you can naturally and easily write complex processes such as generating chunks at random times instead of fixed times.

```csharp
Observable.Interval(TimeSpan.FromSeconds(1))
.Index()
.Chunk(async (_, ct) =>
{
await Task.Delay(TimeSpan.FromSeconds(Random.Shared.Next(0, 5)), ct);
})
.Subscribe(xs =>
{
Console.WriteLine(string.Join(", ", xs));
});
```
These asynchronous methods are immediately canceled when `OnCompleted` is issued, and the subsequent `OnCompleted` is executed.

By utilizing async/await for Retry-related operations, you can achieve better handling. For instance, whereas the previous version of Rx could only retry the entire pipeline, with R3, which accepts async/await, it is possible to retry on a per asynchronous method execution basis.

```csharp
button.OnClickAsObservable()
.SelectAwait(async (_, ct) =>
{
var retry = 0;
AGAIN:
try
{
var req = await UnityWebRequest.Get("https://google.com/").SendWebRequest().WithCancellation(ct);
return req.downloadHandler.text;
}
catch
{
if (retry++ < 3) goto AGAIN;
throw;
}
}, AwaitOperation.Drop)
```

Repeat can also be implemented in combination with async/await. In this case, handling complex conditions for Repeat might be easier than completing it with Rx alone.

```csharp
while (!ct.IsCancellationRequested)
{
await button.OnClickAsObservable()
.Take(1)
.ForEachAsync(_ =>
{
// do something
});
}
```

Concurrency Policy
---
The composition of operators is thread-safe, and it is expected that the values flowing through OnNext are on a single thread. In other words, if OnNext is issued on multiple threads, the operators may behave unexpectedly. This is the same as with dotnet/reactive.

For example, while Subject itself is thread-safe, the operators are not thread-safe.

```csharp
// dotnet/reactive
var subject = new System.Reactive.Subjects.Subject<int>();

// single execution shows 100 but actually 9* multiple times(broken)
subject.Take(100).Count().Subscribe(x => Console.WriteLine(x));

Parallel.For(0, 1000, new ParallelOptions { MaxDegreeOfParallelism = 10 }, x => subject.OnNext(x));
```

This means that the issuance of OnNext must always be done on a single thread. Also, ReactiveProperty, which corresponds to BehaviorSubject in dotnet/reactive, is not thread-safe itself, so updating the value (set Value or call OnNext) must always be done on a single thread.

For converting external inputs into Observables, such as with FromEvent, and when the source of input issues in a multi-threaded manner, it is necessary to synchronize using `Synchronize` to construct the correct operator chain.

ObservableCollections
---
As a special collection for monitoring changes in collections and handling them in R3, the [ObservableCollections](https://github.com/Cysharp/ObservableCollections)'s `ObservableCollections.R3` package is available.
Expand Down Expand Up @@ -779,6 +845,33 @@ public class CommandViewModel : IDisposable

![rpcommand](https://github.com/Cysharp/R3/assets/46207/c456829e-1493-446d-831b-425f05be5d05)

### INotifyPropertyChanged to Observable

To convert properties of `INotifyPropertyChanged` and `INotifyPropertyChanging` into Observables, you can use `ObservePropertyChanged` and `ObservePropertyChanging`.

```csharp
var person = new Person { Name = "foo" };

person.ObservePropertyChanged(x => x.Name)
.Subscribe(x => Console.WriteLine($"Changed:{x}"));

p.Name = "bar";
p.Name = "baz";
```

`Func<T, TProperty> propertySelector` only supports simple property name lambda. This is because, in R3, `CallerArgumentExpression` is used to extract, for example from `x => x.Name` to "Name".

### FromEvent

To convert existing events into Observables, use FromEvent. Because it requires the conversion of delegates and has a unique way of calling, please refer to the following sample.

```csharp
Observable.FromEvent<RoutedEventHandler, RoutedEventArgs>(
h => (sender, e) => h(e),
e => button.Click += e,
e => button.Click -= e);
```

Platform Supports
---
Even without adding specific platform support, it is possible to use only the core library. However, Rx becomes more user-friendly by replacing the standard `TimeProvider` and `FrameProvider` with those optimized for each platform. For example, while the standard `TimeProvider` is thread-based, using a UI thread-based `TimeProvider` for each platform can eliminate the need for dispatch through `ObserveOn`, enhancing usability. Additionally, since message loops differ across platforms, the use of individual `FrameProvider` is essential.
Expand Down Expand Up @@ -1158,7 +1251,7 @@ public static Observable<(T0 Arg0, T1 Arg1, T2 Arg2)> AsObservable<T0, T1, T2>(t
public static Observable<(T0 Arg0, T1 Arg1, T2 Arg2, T3 Arg3)> AsObservable<T0, T1, T2, T3>(this UnityEngine.Events.UnityEvent<T0, T1, T2, T3> unityEvent, CancellationToken cancellationToken = default)
```

Additionally, with extension methods for uGUI, uGUI events can be easily converted to Observables. OnValueChangedAsObservable starts the subscription by first emitting the latest value at the time of subscription. Andalso when the associated component is destroyed, it emits an OnCompleted event to ensure the subscription is reliably cancelled.
Additionally, with extension methods for uGUI, uGUI events can be easily converted to Observables. OnValueChangedAsObservable starts the subscription by first emitting the latest value at the time of subscription. Also when the associated component is destroyed, it emits an OnCompleted event to ensure the subscription is reliably cancelled.

```csharp
public static IDisposable SubscribeToText(this Observable<string> source, Text text)
Expand Down Expand Up @@ -1244,7 +1337,7 @@ public class NewBehaviourScript : MonoBehaviour

R3 can handle [MonoBehaviour messages](https://docs.unity3d.com/ScriptReference/MonoBehaviour.html) with R3.Triggers:

These can also be handled more easily by directly subscribing to observables returned by extension methods on Component/GameObject. These methods inject ObservableTrigger automaticaly.
These can also be handled more easily by directly subscribing to observables returned by extension methods on Component/GameObject. These methods inject ObservableTrigger automatically.

```csharp
using R3;
Expand Down

0 comments on commit 094b5b6

Please sign in to comment.