Skip to content

Commit

Permalink
feat: vowTools.allSettled (#10077)
Browse files Browse the repository at this point in the history
## Description

- adds vowTools helper that mimics the behavior of `Promise.allSettled` to support #9902
- tests unstorable results path which resulted in a type guard change

### Security Considerations
n/a

### Scaling Considerations
n/a

### Documentation Considerations
Updates `vow/README.md` with information about `VowTools`

### Testing Considerations
Includes unit tests 

### Upgrade Considerations
n/a, library code
  • Loading branch information
mergify[bot] authored Sep 18, 2024
2 parents 9cdb01d + c3babe6 commit 6ee57ba
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 79 deletions.
63 changes: 62 additions & 1 deletion packages/vow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Here they are: {
```

You can use `heapVowE` exported from `@agoric/vow`, which converts a chain of
promises and vows to a promise for its final fulfilment, by unwrapping any
promises and vows to a promise for its final fulfillment, by unwrapping any
intermediate vows:

```js
Expand Down Expand Up @@ -77,6 +77,67 @@ const { watch, makeVowKit } = prepareVowTools(vowZone);
// Vows and resolvers you create can be saved in durable stores.
```

## VowTools

VowTools are a set of utility functions for working with Vows in Agoric smart contracts and vats. These tools help manage asynchronous operations in a way that's resilient to vat upgrades, ensuring your smart contract can handle long-running processes reliably.

### Usage

VowTools are typically prepared in the start function of a smart contract or vat and passed in as a power to exos.


```javascript
import { prepareVowTools } from '@agoric/vow/vat.js';
import { makeDurableZone } from '@agoric/zone/durable.js';

export const start = async (zcf, privateArgs, baggage) => {
const zone = makeDurableZone(baggage);
const vowTools = prepareVowTools(zone.subZone('vows'));

// Use vowTools here...
}
```

### Available Tools

#### `when(vowOrPromise)`
Returns a Promise for the fulfillment of the very end of the `vowOrPromise` chain. It can retry disconnections due to upgrades of other vats, but cannot survive the upgrade of the calling vat.

#### `watch(promiseOrVow, [watcher], [context])`
Watch a Vow and optionally provide a `watcher` with `onFulfilled`/`onRejected` handlers and a `context` value for the handlers. When handlers are not provided the fulfillment or rejection will simply pass through.

It also registers pending Promises, so if the current vat is upgraded, the watcher is rejected because the Promise was lost when the heap was reset.

#### `all(arrayOfPassables, [watcher], [context])`
Vow-tolerant implementation of Promise.all that takes an iterable of vows and other Passables and returns a single Vow. It resolves with an array of values when all of the input's promises or vows are fulfilled and rejects with the first rejection reason when any of the input's promises or vows are rejected.

#### `allSettled(arrayOfPassables, [watcher], [context])`
Vow-tolerant implementation of Promise.allSettled that takes an iterable of vows and other Passables and returns a single Vow. It resolves when all of the input's promises or vows are settled with an array of settled outcome objects.

#### `asVow(fn)`
Takes a function that might return synchronously, throw an Error, or return a Promise or Vow and returns a Vow.

#### `asPromise(vow)`
Converts a Vow back into a Promise.

### Example

```javascript
const { when, watch, all, allSettled } = vowTools;

// Using watch to create a Vow
const myVow = watch(someAsyncOperation());

// Using when to resolve a Vow
const result = await when(myVow);

// Using all
const results = await when(all([vow, vowForVow, promise]));

// Using allSettled
const outcomes = await when(allSettled([vow, vowForVow, promise]));
```

## Internals

The current "version 0" vow internals expose a `shorten()` method, returning a
Expand Down
29 changes: 26 additions & 3 deletions packages/vow/src/tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { makeWhen } from './when.js';

/**
* @import {Zone} from '@agoric/base-zone';
* @import {Passable} from '@endo/pass-style';
* @import {IsRetryableReason, AsPromiseFunction, EVow, Vow, ERef} from './types.js';
*/

Expand Down Expand Up @@ -52,11 +53,31 @@ export const prepareBasicVowTools = (zone, powers = {}) => {
};

/**
* Vow-tolerant implementation of Promise.all.
* Vow-tolerant implementation of Promise.all that takes an iterable of vows
* and other {@link Passable}s and returns a single {@link Vow}. It resolves
* with an array of values when all of the input's promises or vows are
* fulfilled and rejects when any of the input's promises or vows are
* rejected with the first rejection reason.
*
* @param {EVow<unknown>[]} maybeVows
* @param {unknown[]} maybeVows
*/
const allVows = maybeVows => watchUtils.all(maybeVows);
const all = maybeVows => watchUtils.all(maybeVows);

/**
* @param {unknown[]} maybeVows
* @deprecated use `vowTools.all`
*/
const allVows = all;

/**
* Vow-tolerant implementation of Promise.allSettled that takes an iterable
* of vows and other {@link Passable}s and returns a single {@link Vow}. It
* resolves when all of the input's promises or vows are settled with an
* array of settled outcome objects.
*
* @param {unknown[]} maybeVows
*/
const allSettled = maybeVows => watchUtils.allSettled(maybeVows);

/** @type {AsPromiseFunction} */
const asPromise = (specimenP, ...watcherArgs) =>
Expand All @@ -66,7 +87,9 @@ export const prepareBasicVowTools = (zone, powers = {}) => {
when,
watch,
makeVowKit,
all,
allVows,
allSettled,
asVow,
asPromise,
retriable,
Expand Down
1 change: 1 addition & 0 deletions packages/vow/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export {};
*/

/**
* Vows are objects that represent promises that can be stored durably.
* @template [T=any]
* @typedef {CopyTagged<'Vow', VowPayload<T>>} Vow
*/
Expand Down
140 changes: 100 additions & 40 deletions packages/vow/src/watch-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const { Fail, bare, details: X } = assert;
* @import {Zone} from '@agoric/base-zone';
* @import {Watch} from './watch.js';
* @import {When} from './when.js';
* @import {VowKit, AsPromiseFunction, IsRetryableReason, EVow} from './types.js';
* @import {VowKit, AsPromiseFunction, IsRetryableReason, Vow} from './types.js';
*/

const VowShape = M.tagged(
Expand Down Expand Up @@ -54,11 +54,16 @@ export const prepareWatchUtils = (
{
utils: M.interface('Utils', {
all: M.call(M.arrayOf(M.any())).returns(VowShape),
allSettled: M.call(M.arrayOf(M.any())).returns(VowShape),
asPromise: M.call(M.raw()).rest(M.raw()).returns(M.promise()),
}),
watcher: M.interface('Watcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
onRejected: M.call(M.any()).rest(M.any()).returns(M.any()),
onFulfilled: M.call(M.raw()).rest(M.raw()).returns(M.raw()),
onRejected: M.call(M.raw()).rest(M.raw()).returns(M.raw()),
}),
helper: M.interface('Helper', {
createVow: M.call(M.arrayOf(M.any()), M.boolean()).returns(VowShape),
processResult: M.call(M.raw()).rest(M.raw()).returns(M.undefined()),
}),
retryRejectionPromiseWatcher: PromiseWatcherI,
},
Expand All @@ -68,6 +73,7 @@ export const prepareWatchUtils = (
* @property {number} remaining
* @property {MapStore<number, any>} resultsMap
* @property {VowKit['resolver']} resolver
* @property {boolean} [isAllSettled]
*/
/** @type {MapStore<bigint, VowState>} */
const idToVowState = detached.mapStore('idToVowState');
Expand All @@ -79,32 +85,83 @@ export const prepareWatchUtils = (
},
{
utils: {
/** @param {unknown[]} specimens */
all(specimens) {
return this.facets.helper.createVow(specimens, false);
},
/** @param {unknown[]} specimens */
allSettled(specimens) {
return /** @type {Vow<({status: 'fulfilled', value: any} | {status: 'rejected', reason: any})[]>} */ (
this.facets.helper.createVow(specimens, true)
);
},
/** @type {AsPromiseFunction} */
asPromise(specimenP, ...watcherArgs) {
// Watch the specimen in case it is an ephemeral promise.
const vow = watch(specimenP, ...watcherArgs);
const promise = when(vow);
// Watch the ephemeral result promise to ensure that if its settlement is
// lost due to upgrade of this incarnation, we will at least cause an
// unhandled rejection in the new incarnation.
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher);

return promise;
},
},
watcher: {
/**
* @param {EVow<unknown>[]} vows
* @param {unknown} value
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
*/
all(vows) {
onFulfilled(value, ctx) {
this.facets.helper.processResult(value, ctx, 'fulfilled');
},
/**
* @param {unknown} reason
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
*/
onRejected(reason, ctx) {
this.facets.helper.processResult(reason, ctx, 'rejected');
},
},
helper: {
/**
* @param {unknown[]} specimens
* @param {boolean} isAllSettled
*/
createVow(specimens, isAllSettled) {
const { nextId: id, idToVowState } = this.state;
/** @type {VowKit<any[]>} */
const kit = makeVowKit();

// Preserve the order of the vow results.
for (let index = 0; index < vows.length; index += 1) {
watch(vows[index], this.facets.watcher, {
// Preserve the order of the results.
for (let index = 0; index < specimens.length; index += 1) {
watch(specimens[index], this.facets.watcher, {
id,
index,
numResults: vows.length,
numResults: specimens.length,
isAllSettled,
});
}

if (vows.length > 0) {
if (specimens.length > 0) {
// Save the state until rejection or all fulfilled.
this.state.nextId += 1n;
idToVowState.init(
id,
harden({
resolver: kit.resolver,
remaining: vows.length,
remaining: specimens.length,
resultsMap: detached.mapStore('resultsMap'),
isAllSettled,
}),
);
const idToNonStorableResults = provideLazyMap(
Expand All @@ -119,27 +176,36 @@ export const prepareWatchUtils = (
}
return kit.vow;
},
/** @type {AsPromiseFunction} */
asPromise(specimenP, ...watcherArgs) {
// Watch the specimen in case it is an ephemeral promise.
const vow = watch(specimenP, ...watcherArgs);
const promise = when(vow);
// Watch the ephemeral result promise to ensure that if its settlement is
// lost due to upgrade of this incarnation, we will at least cause an
// unhandled rejection in the new incarnation.
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher);

return promise;
},
},
watcher: {
onFulfilled(value, { id, index, numResults }) {
/**
* @param {unknown} result
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
* @param {'fulfilled' | 'rejected'} status
*/
processResult(result, { id, index, numResults, isAllSettled }, status) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// Resolution of the returned vow happened already.
return;
}
const { remaining, resultsMap, resolver } = idToVowState.get(id);
if (!isAllSettled && status === 'rejected') {
// For 'all', we reject immediately on the first rejection
idToVowState.delete(id);
resolver.reject(result);
return;
}

const possiblyWrappedResult = isAllSettled
? harden({
status,
[status === 'fulfilled' ? 'value' : 'reason']: result,
})
: result;

const idToNonStorableResults = provideLazyMap(
utilsToNonStorableResults,
this.facets.utils,
Expand All @@ -152,15 +218,16 @@ export const prepareWatchUtils = (
);

// Capture the fulfilled value.
if (zone.isStorable(value)) {
resultsMap.init(index, value);
if (zone.isStorable(possiblyWrappedResult)) {
resultsMap.init(index, possiblyWrappedResult);
} else {
nonStorableResults.set(index, value);
nonStorableResults.set(index, possiblyWrappedResult);
}
const vowState = harden({
remaining: remaining - 1,
resultsMap,
resolver,
isAllSettled,
});
if (vowState.remaining > 0) {
idToVowState.set(id, vowState);
Expand All @@ -177,26 +244,19 @@ export const prepareWatchUtils = (
results[i] = resultsMap.get(i);
} else {
numLost += 1;
results[i] = isAllSettled
? { status: 'rejected', reason: 'Unstorable result was lost' }
: undefined;
}
}
if (numLost > 0) {
if (numLost > 0 && !isAllSettled) {
resolver.reject(
assert.error(X`${numLost} unstorable results were lost`),
);
} else {
resolver.resolve(harden(results));
}
},
onRejected(value, { id, index: _index, numResults: _numResults }) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// First rejection wins.
return;
}
const { resolver } = idToVowState.get(id);
idToVowState.delete(id);
resolver.reject(value);
},
},
retryRejectionPromiseWatcher: {
onFulfilled(_result) {},
Expand Down
15 changes: 15 additions & 0 deletions packages/vow/test/types.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,18 @@ expectType<(p1: number, p2: string) => Vow<{ someValue: 'bar' }>>(
Promise.resolve({ someValue: 'bar' } as const),
),
);

expectType<
Vow<
(
| { status: 'fulfilled'; value: any }
| { status: 'rejected'; reason: any }
)[]
>
>(
vt.allSettled([
Promise.resolve(1),
Promise.reject(new Error('test')),
Promise.resolve('hello'),
]),
);
Loading

0 comments on commit 6ee57ba

Please sign in to comment.