Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(RecoverWith): add recoverWith Operator #260

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
57 changes: 35 additions & 22 deletions API.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
# API

- [Types](#types)
- [Subscription](#Subscription)
- [Scheduler](#Scheduler)
- [Observer](#Observer)
- [Observable](#Observable)
- [Subscription](#subscription)
- [Scheduler](#scheduler)
- [Observer](#observer)
- [Observable](#observable)
- [Sinks](#sinks)
- [forEach](#forEach)
- [toPromise](#toPromise)
- [toNodeStream](#toNodeStream)
- [forEach](#foreach)
- [toPromise](#topromise)
- [toNodeStream](#tonodestream)
- [Sources](#sources)
- [empty](#empty)
- [frames](#frames)
- [fromArray](#fromArray)
- [fromDOM](#fromDOM)
- [fromNodeStream](#fromNodeStream)
- [fromPromise](#fromPromise)
- [fromArray](#fromarray)
- [fromDOM](#fromdom)
- [fromNodeStream](#fromnodestream)
- [fromPromise](#frompromise)
- [interval](#interval)
- [just](#just)
- [never](#never)
Expand All @@ -24,30 +24,31 @@
- [Operators](#operators)
- [combine](#combine)
- [concat](#concat)
- [concatMap](#concatMap)
- [concatMap](#concatmap)
- [delay](#delay)
- [recoverWith](#recoverwith)
- [filter](#filter)
- [flatMap](#flatMap)
- [flatMap](#flatmap)
- [join](#join)
- [map](#map)
- [mapTo](#mapTo)
- [mapTo](#mapto)
- [merge](#merge)
- [mergeMap](#mergeMap)
- [mergeMap](#mergemap)
- [multicast](#multicast)
- [reduce](#reduce)
- [sample](#sample)
- [scan](#scan)
- [skipRepeats](#skipRepeats)
- [skipRepeats](#skiprepeats)
- [slice](#slice)
- [switchLatest](#switchLatest)
- [switchMap](#switchMap)
- [switchLatest](#switchlatest)
- [switchMap](#switchmap)
- [tap](#tap)
- [unique](#unique)
- [uniqueWith](#uniqueWith)
- [uniqueWith](#uniquewith)
- [Testing](#testing)
- [TestScheduler](#TestScheduler)
- [TestScheduler](#testscheduler)
- [marble](#marble)
- [toMarble](#toMarble)
- [toMarble](#tomarble)

>All this stuff in just 4kb!

Expand Down Expand Up @@ -376,6 +377,18 @@ Takes in two params viz `duration` and the `source` stream and delays each value
const $ = O.delay(100, O.of('hi')) // emits 'hi' after 100ms
```

## recoverWith
```ts
function recoverWith(fn: (err: any) => Observable, source: Observable): Observable
```
Transforms errors of an input stream into values.

**Example:**

```ts
const $ = O.recoverWith(err => O.of(1), O.of(new Error('err1'), 100)) // emits '1' and '100'
```


## filter
```ts
Expand Down Expand Up @@ -799,4 +812,4 @@ toMarble([

*/

```
```
1 change: 1 addition & 0 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ export {tap} from './operators/Map'
export {toPromise} from './sinks/ToPromise'
export {uniqueWith} from './operators/Unique'
export {unique} from './operators/Unique'
export {recoverWith} from './operators/recoverWith'
47 changes: 47 additions & 0 deletions src/operators/RecoverWith.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Created by srijan02420 on 12/10/18.
*/

import {CompleteMixin, NextMixin, Virgin} from '../internal/Mixins'
import {IObservable} from '../internal/Observable'
import {IObserver} from '../internal/Observer'
import {ISubscription} from '../internal/Subscription'
import {curry} from '../internal/Utils'
import {flatMap, just} from '../main'
import {IScheduler} from '../schedulers/Scheduler'

export type TSource<T> = IObservable<T>
export type TResult<R> = IObservable<R>
export type TMapper<T> = (err: Error) => IObservable<T>

class RecoverWithObserver<T> extends NextMixin(CompleteMixin(Virgin))
implements IObserver<T | Error> {
constructor(public sink: IObserver<T | Error>) {
super()
}

error(err: Error): void {
this.sink.next(err)
}
}

class RecoverWithObservable<T> implements TResult<T | Error> {
constructor(private source: IObservable<T>) {}

subscribe(observer: IObserver<T>, scheduler: IScheduler): ISubscription {
return this.source.subscribe(new RecoverWithObserver(observer), scheduler)
}
}

export const recoverWith = curry(function<T>(
mapFunction: TMapper<T>,
source: TSource<T>
) {
return flatMap(
val => (val instanceof Error ? mapFunction(val) : just(val)),
new RecoverWithObservable(source)
)
}) as {
<T>(mapper: TMapper<T>, source: TSource<T>): TResult<T>
<T>(): {mapper: TMapper<T>; (source: TSource<T>): TResult<T>}
}
52 changes: 52 additions & 0 deletions test/test.RecoverWith.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import * as t from 'assert'
import {EVENT} from '../src/internal/Events'
import {fromMarble} from '../src/internal/Marble'
import {just} from '../src/main'
import {recoverWith} from '../src/operators/RecoverWith'
import {createTestScheduler} from '../src/schedulers/TestScheduler'

const {next, complete, error} = EVENT

describe('recoverWith()', () => {
it('should emit errors as data', () => {
const sh = createTestScheduler()
const $ = sh.Cold<number>([
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need tests for when the subscription starts and when it ends.

next(200, 10),
error(201, new Error('err1')),
error(202, new Error('err2')),
next(203, 10),
complete(205)
])
const {results} = sh.start(() => recoverWith((err: Error) => just(-1), $))
t.deepEqual(results, [
next(401, 10),
next(402, -1),
next(403, -1),
next(404, 10),
complete(405)
])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a marble test instead.

})
it('(marble): should emit errors as data', () => {
const SH = createTestScheduler()
const source$ = SH.Cold(fromMarble('a-b-#-c-#-d|'))
const o$ = SH.Cold(fromMarble('z-x-y-w-|'))
const {results} = SH.start(() => {
return recoverWith((err: Error) => o$, source$)
})
t.deepEqual(results, [
EVENT.next(401, 'a'),
EVENT.next(403, 'b'),
EVENT.next(407, 'c'),
EVENT.next(411, 'd'),
EVENT.next(604, 'z'),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as an error is dispatched. The subscription should end. So even though the stream is firing c-#-d it should not be captured by the observer. This is a fundamental architectural assumption that is being made for all other operators.

EVENT.next(606, 'x'),
EVENT.next(608, 'y'),
EVENT.next(608, 'z'),
EVENT.next(610, 'w'),
EVENT.next(610, 'x'),
EVENT.next(612, 'y'),
EVENT.next(614, 'w'),
EVENT.complete(616)
])
})
})