Skip to content

Commit

Permalink
Added previous steps results as arguments in pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
guillemcordoba committed Oct 16, 2023
1 parent fc5f438 commit 84120f0
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 47 deletions.
2 changes: 1 addition & 1 deletion packages/stores/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@holochain-open-dev/stores",
"version": "0.7.3",
"version": "0.7.4",
"description": "Re-export of svelte/store, with additional utilities to build reusable holochain-open-dev modules",
"author": "[email protected]",
"main": "dist/index.js",
Expand Down
130 changes: 85 additions & 45 deletions packages/stores/src/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@ export type PipeStep<T> = AsyncReadable<T> | Readable<T> | Promise<T> | T;

function pipeStep<T, U>(
store: AsyncReadable<T>,
stepFn: (arg: T) => PipeStep<U>
stepFn: (arg: T, ...args: any[]) => PipeStep<U>,
previousStores: Array<AsyncReadable<any>>
): AsyncReadable<U> {
return derived(store, (value, set) => {
return derived([store, ...previousStores], (values, set) => {
const value = values[0];
if (value.status === "error") set(value);
else if (value.status === "pending") set(value);
else {
const v = stepFn(value.value);
const v = stepFn(
value.value,
...values
.slice(1)
.map((v) => (v as any).value)
.reverse()
);

if ((v as Readable<any>).subscribe) {
return (v as Readable<any>).subscribe((value) => {
Expand Down Expand Up @@ -48,7 +56,9 @@ function pipeStep<T, U>(

/**
* Takes an AsyncReadable store and derives it with the given functions
* Each step may return an `AsyncReadable`, `Readable`, `Promise` or just a raw value
* - Each step may return an `AsyncReadable`, `Readable`, `Promise` or just a raw value
* - Each step receives the results of all the previous steps, normally you'll only need
* the result for the latest one
*
* ```js
* const asyncReadableStore = lazyLoad(async () => {
Expand All @@ -58,16 +68,16 @@ function pipeStep<T, U>(
* const pipeStore = pipe(
* asyncReadableStore,
* (n1) =>
* lazyLoad(async () => { // Step with `AsyncReadable`
* lazyLoad(async () => { // Step with `AsyncReadable`
* await sleep(1);
* return n1 + 1;
* }),
* (n2) => readable(n2 + 1), // Step with `Readable`
* async (n3) => { // Step with `Promise`
* (n2) => readable(n2 + 1), // Step with `Readable`
* async (n3, n2, n1) => { // Step with `Promise`
* await sleep(1);
* return n3 + 1;
* },
* (n4) => n4 + 1 // Step with raw value
* (n4, n3, n2, n1) => n4 + 1 // Step with raw value
* );
* pipeStore.subscribe(value => console.log(value)); // Use like any other store, will print "5" after 3 milliseconds
* ```
Expand All @@ -79,64 +89,94 @@ export function pipe<T, U>(
export function pipe<T, U, V>(
store: AsyncReadable<T>,
fn1: (arg: T) => PipeStep<U>,
fn2: (arg: U) => PipeStep<V>
fn2: (arg: U, prevArg0: T) => PipeStep<V>
): AsyncReadable<V>;
export function pipe<T, U, V, W>(
store: AsyncReadable<T>,
fn1: (arg: T) => PipeStep<U>,
fn2: (arg: U) => PipeStep<V>,
fn3: (arg: V) => PipeStep<W>
fn2: (arg: U, prevArg: T) => PipeStep<V>,
fn3: (arg: V, prevArg0: U, prevArg1: T) => PipeStep<W>
): AsyncReadable<W>;
export function pipe<T, U, V, W, X>(
store: AsyncReadable<T>,
fn1: (arg: T) => PipeStep<U>,
fn2: (arg: U) => PipeStep<V>,
fn3: (arg: V) => PipeStep<W>,
fn4: (arg: W) => PipeStep<X>
fn2: (arg: U, prevArg: T) => PipeStep<V>,
fn3: (arg: V, prevArg0: U, prevArg1: T) => PipeStep<W>,
fn4: (arg: W, prevArg0: V, prevArg1: U, prevArg2: T) => PipeStep<X>
): AsyncReadable<X>;
export function pipe<T, U, V, W, X, Y>(
store: AsyncReadable<T>,
fn1: (arg: T) => PipeStep<U>,
fn2: (arg: U) => PipeStep<V>,
fn3: (arg: V) => PipeStep<W>,
fn4: (arg: W) => PipeStep<X>,
fn5: (arg: X) => PipeStep<Y>
fn2: (arg: U, prevArg: T) => PipeStep<V>,
fn3: (arg: V, prevArg0: U, prevArg1: T) => PipeStep<W>,
fn4: (arg: W, prevArg0: V, prevArg1: U, prevArg2: T) => PipeStep<X>,
fn5: (
arg: X,
prevArg0: W,
prevArg1: V,
prevArg2: U,
prevArg3: T
) => PipeStep<Y>
): AsyncReadable<Y>;
export function pipe<T, U, V, W, X, Y, Z>(
store: AsyncReadable<T>,
fn1: (arg: T) => PipeStep<U>,
fn2: (arg: U) => PipeStep<V>,
fn3: (arg: V) => PipeStep<W>,
fn4: (arg: W) => PipeStep<X>,
fn5: (arg: X) => PipeStep<Y>,
fn6: (arg: Y) => PipeStep<Z>
fn2: (arg: U, prevArg: T) => PipeStep<V>,
fn3: (arg: V, prevArg0: U, prevArg1: T) => PipeStep<W>,
fn4: (arg: W, prevArg0: V, prevArg1: U, prevArg2: T) => PipeStep<X>,
fn5: (
arg: X,
prevArg0: W,
prevArg1: V,
prevArg2: U,
prevArg3: T
) => PipeStep<Y>,
fn6: (
arg: Y,
prevArg0: X,
prevArg1: W,
prevArg2: V,
prevArg3: U,
prevArg4: T
) => PipeStep<Z>
): AsyncReadable<Z>;
export function pipe<T, U, V, W, X, Y, Z>(
store: AsyncReadable<T>,
fn1: (arg: T) => PipeStep<U>,
fn2?: (arg: U) => PipeStep<V>,
fn3?: (arg: V) => PipeStep<W>,
fn4?: (arg: W) => PipeStep<X>,
fn5?: (arg: X) => PipeStep<Y>,
fn6?: (arg: Y) => PipeStep<Z>
fn2?: (arg: U, prevArg: T) => PipeStep<V>,
fn3?: (arg: V, prevArg0: U, prevArg1: T) => PipeStep<W>,
fn4?: (arg: W, prevArg0: V, prevArg1: U, prevArg2: T) => PipeStep<X>,
fn5?: (
arg: X,
prevArg0: W,
prevArg1: V,
prevArg2: U,
prevArg3: T
) => PipeStep<Y>,
fn6?: (
arg: Y,
prevArg0: X,
prevArg1: W,
prevArg2: V,
prevArg3: U,
prevArg4: T
) => PipeStep<Z>
): AsyncReadable<Z> {
let s: AsyncReadable<any> = pipeStep(store, fn1);
const s1: AsyncReadable<any> = pipeStep(store, fn1, []);

if (fn2) {
s = pipeStep(s, fn2);
}
if (fn3) {
s = pipeStep(s, fn3);
}
if (fn4) {
s = pipeStep(s, fn4);
}
if (fn5) {
s = pipeStep(s, fn5);
}
if (fn6) {
s = pipeStep(s, fn6);
}
if (!fn2) return s1;
const s2 = pipeStep(s1, fn2, [store]);

return s;
if (!fn3) return s2 as any;
const s3 = pipeStep(s2, fn3, [store, s1]);

if (!fn4) return s3 as any;
const s4 = pipeStep(s3, fn4, [store, s1, s2]);

if (!fn5) return s4 as any;
const s5 = pipeStep(s4, fn5, [store, s1, s2, s3]);
if (!fn6) return s5 as any;
const s6 = pipeStep(s5, fn6, [store, s1, s2, s3, s4]);

return s6;
}
57 changes: 56 additions & 1 deletion packages/stores/tests/pipe.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ it("pipe with promise", async () => {
const subscriber = pipeStore.subscribe(() => {});

expect(get(pipeStore)).to.deep.equal({ status: "pending" });
await sleep(20);
await sleep(30);

expect(get(pipeStore)).to.deep.equal({
status: "complete",
Expand Down Expand Up @@ -109,3 +109,58 @@ it("pipe with all types", async () => {
value: "hihihihihi",
});
});

it("pipe yield the results for every step", async () => {
const asyncReadableStore = lazyLoad(async () => {
await sleep(10);
return 1;
});
const pipeStore = pipe(
asyncReadableStore,
(s1) => s1 + 1,
(s2, s1) => {
expect(s1).to.equal(1);
expect(s2).to.equal(2);
return s1 + s2;
},
(s3, s2, s1) => {
expect(s1).to.equal(1);
expect(s2).to.equal(2);
expect(s3).to.equal(3);
return s1 + s2 + s3;
},
(s4, s3, s2, s1) => {
expect(s1).to.equal(1);
expect(s2).to.equal(2);
expect(s3).to.equal(3);
expect(s4).to.equal(6);
return s1 + s2 + s3 + s4;
},
(s5, s4, s3, s2, s1) => {
expect(s1).to.equal(1);
expect(s2).to.equal(2);
expect(s3).to.equal(3);
expect(s4).to.equal(6);
expect(s5).to.equal(12);
return s1 + s2 + s3 + s4 + s5;
},
(s6, s5, s4, s3, s2, s1) => {
expect(s1).to.equal(1);
expect(s2).to.equal(2);
expect(s3).to.equal(3);
expect(s4).to.equal(6);
expect(s5).to.equal(12);
expect(s6).to.equal(24);
return s1 + s2 + s3 + s4 + s5 + s6;
}
);
const subscriber = pipeStore.subscribe(() => {});

expect(get(pipeStore)).to.deep.equal({ status: "pending" });
await sleep(30);

expect(get(pipeStore)).to.deep.equal({
status: "complete",
value: 48,
});
});

0 comments on commit 84120f0

Please sign in to comment.