Skip to content

Commit

Permalink
Merge pull request #6866 from Agoric/mfig-notifier-housecleaning
Browse files Browse the repository at this point in the history
feat(notifier)!: housecleaning
  • Loading branch information
mergify[bot] authored Mar 5, 2023
2 parents 9ceb479 + 6466071 commit 1a66ff6
Show file tree
Hide file tree
Showing 38 changed files with 562 additions and 611 deletions.
8 changes: 6 additions & 2 deletions packages/SwingSet/test/metering/vat-load-dynamic.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export function buildRootObject(vatPowers) {
const { testLog: log } = vatPowers;
let service;
let control;
const notifierToUpdateCount = new WeakMap();

return Far('root', {
async bootstrap(vats, devices) {
Expand Down Expand Up @@ -33,8 +34,11 @@ export function buildRootObject(vatPowers) {

async whenMeterNotifiesNext(meter) {
const notifier = await E(meter).getNotifier();
const initial = await E(notifier).getUpdateSince();
return E(notifier).getUpdateSince(initial);
const update = await E(notifier).getUpdateSince(
notifierToUpdateCount.get(notifier),
);
notifierToUpdateCount.set(notifier, update.updateCount);
return update;
},

async createVat(name, dynamicOptions) {
Expand Down
4 changes: 2 additions & 2 deletions packages/assert/src/assert.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ if (globalAssert === undefined) {
);
}

const missing = [
const missing = /** @type {const} */ ([
'fail',
'equal',
'typeof',
Expand All @@ -43,7 +43,7 @@ const missing = [
'Fail',
'quote',
'makeAssert',
].filter(name => globalAssert[name] === undefined);
]).filter(name => globalAssert[name] === undefined);
if (missing.length > 0) {
throw new Error(
`Cannot initialize @agoric/assert, missing globalThis.assert methods ${missing.join(
Expand Down
12 changes: 6 additions & 6 deletions packages/casting/src/follower.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Far } from '@endo/far';
import {
mapAsyncIterable,
makeNotifierIterable,
makeSubscriptionIterable,
subscribeEach,
subscribeLatest,
} from './iterable.js';
import { makeCosmjsFollower } from './follower-cosmjs.js';
import { makeCastingSpec } from './casting-spec.js';
Expand All @@ -20,10 +20,10 @@ const makeSubscriptionFollower = spec => {
const { notifier, subscription } = await spec;
let ai;
if (notifier) {
ai = makeNotifierIterable(notifier);
ai = subscribeLatest(notifier);
} else {
assert(subscription);
ai = makeSubscriptionIterable(subscription);
ai = subscribeEach(subscription);
}
return mapAsyncIterable(ai, transform);
},
Expand All @@ -32,10 +32,10 @@ const makeSubscriptionFollower = spec => {
const { notifier, subscription } = await spec;
let ai;
if (subscription) {
ai = makeSubscriptionIterable(subscription);
ai = subscribeEach(subscription);
} else {
assert(notifier);
ai = makeNotifierIterable(notifier);
ai = subscribeLatest(notifier);
}
return mapAsyncIterable(ai, transform);
},
Expand Down
43 changes: 1 addition & 42 deletions packages/casting/src/iterable.js
Original file line number Diff line number Diff line change
@@ -1,47 +1,6 @@
import { E, Far } from '@endo/far';
import { makeNotifier } from '@agoric/notifier';

/**
* @template T
* @param {ERef<Notifier<T>>} notifier
* @returns {ConsistentAsyncIterable<T>}
*/
export const makeNotifierIterable = notifier =>
makeNotifier(E(notifier).getSharableNotifierInternals());

/**
* TODO: Remove this function when we have an @endo/publish-kit that suppports pull topics
*
* @template T
* @param {ERef<PublicationRecord<T>>} tailP
* @returns {AsyncIterator<T>}
*/
const makeSubscriptionIterator = tailP => {
// To understand the implementation, start with
// https://web.archive.org/web/20160404122250/http://wiki.ecmascript.org/doku.php?id=strawman:concurrency#infinite_queue
return Far('SubscriptionIterator', {
next: async () => {
const resultP = E.get(tailP).head;
tailP = E.get(tailP).tail;
return resultP;
},
});
};

/**
* TODO: Remove this function when we have an @endo/publish-kit that suppports pull topics
*
* @template T
* @param {ERef<Subscription<T>>} subscription
* @returns {ConsistentAsyncIterable<T>}
*/
export const makeSubscriptionIterable = subscription =>
harden({
[Symbol.asyncIterator]: () =>
makeSubscriptionIterator(
E(subscription).getSharableSubscriptionInternals(),
),
});
export { subscribeEach, subscribeLatest } from '@agoric/notifier/subscribe.js';

/**
* @template TIn
Expand Down
5 changes: 3 additions & 2 deletions packages/inter-protocol/src/vaultFactory/vault.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { AmountMath, AmountShape } from '@agoric/ertp';
import { makeTracer } from '@agoric/internal';
import { StorageNodeShape } from '@agoric/notifier/src/typeGuards.js';
import { makeTracer, makeTypeGuards } from '@agoric/internal';
import { M, prepareExoClassKit } from '@agoric/vat-data';
import {
assertProposalShape,
Expand All @@ -19,6 +18,8 @@ import { calculateLoanCosts } from './math.js';

const { quote: q, Fail } = assert;

const { StorageNodeShape } = makeTypeGuards(M);

const trace = makeTracer('IV', false);

/** @typedef {import('./storeUtils.js').NormalizedDebt} NormalizedDebt */
Expand Down
8 changes: 4 additions & 4 deletions packages/inter-protocol/test/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import { makeTracer } from '@agoric/internal';
const trace = makeTracer('TestMetrics', false);

/**
* @param {import('ava').ExecutionContext} t
* @param {ConsistentAsyncIterable<N>} subscription
* @template {object} N
* @param {import('ava').ExecutionContext} t
* @param {AsyncIterable<N, N>} subscription
*/
export const subscriptionTracker = async (t, subscription) => {
const metrics = makeNotifierFromAsyncIterable(subscription);
Expand Down Expand Up @@ -44,9 +44,9 @@ export const subscriptionTracker = async (t, subscription) => {
/**
* For public facets that have a `getMetrics` method.
*
* @param {import('ava').ExecutionContext} t
* @param {ERef<{getMetrics?: () => Subscriber<unknown>}>} publicFacet
* @template {object} N
* @param {import('ava').ExecutionContext} t
* @param {ERef<{getMetrics?: () => Subscriber<N>}>} publicFacet
*/
export const metricsTracker = async (t, publicFacet) => {
const metricsSub = await E(publicFacet).getMetrics();
Expand Down
1 change: 1 addition & 0 deletions packages/internal/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export * from './config.js';
export * from './debug.js';
export * from './utils.js';
export * from './method-tools.js';
export * from './typeGuards.js';
8 changes: 8 additions & 0 deletions packages/internal/src/typeGuards.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* @param {typeof import('@agoric/store').M} M
*/
export const makeTypeGuards = M =>
harden({
StorageNodeShape: M.remotable('StorageNode'),
});
harden(makeTypeGuards);
21 changes: 10 additions & 11 deletions packages/notifier/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,14 @@ An iteration’s *suffix subset* is defined by its *starting point* in the orig
The values published using the publication define the original iteration. Each subscription has a starting
point in that iteration and provides access to a suffix subset of that iteration starting at that starting
point. The initial subscription created by the `makeSubscriptionKit()` call provides the entire iteration.
Each subscription is a kind of `AsyncIterable` which produces any number of `AsyncIterators`, each of which
advance independently starting with that subscription's starting point. These `AsyncIterators`
are `SubsciptionIterators` which also have a `subscribe()` method. Calling a `SubscriptionIterator`'s `subscribe()`
method makes a `Subscription` whose starting point is that `SubscriptionIterator`'s current position at that time.
Each subscription is an `ForkableAsyncIterable` capable of producing any number of `ForkableAsyncIterator`s,
each of which advances independently from the subscription's starting point.
Each produced `ForkableAsyncIterator` is an `AsyncIterator`,
with a `fork()` method that when called
produces a new `ForkableAsyncIterator`
whose starting point is the current position of its parent `ForkableAsyncIterator`.

Neither Alice nor Bob are good starting points to construct an example of `subscribe()` since their code uses
only a `Subscription`, not a `SubscriptionIterator`. Carol's code is like Bob's except lower level, using
a `SubscriptionIterator` directly. Where Bob uses `observeIteration` which takes an `AsyncIterable`, Carol's
uses the lower level `observeIterator` which takes an `AsyncIterator`.
Carol's code is like Bob's except lower level, using the `ForkableAsyncIterable` interface directly.

```js
import { makePromiseKit } from '@agoric/promiseKit';
Expand All @@ -243,7 +242,7 @@ const { promise: afterA, resolve: afterAResolve } = makePromiseKit();
const observer = harden({
updateState: val => {
if (val === 'a') {
afterAResolve(subscriptionIterator.subscribe());
afterAResolve(subscriptionIterator.fork());
}
console.log('non-final', val);
},
Expand All @@ -257,8 +256,8 @@ observeIterator(subscriptionIterator, observer);
// non-final b
// finished done

// afterA is an ERef<Subscription> so we use observeIteration on it.
observeIteration(afterA, observer);
// afterA is a Promise<ForkableAsyncIterator> so we use observeIterator on it.
observeIterator(afterA, observer);
// eventually prints
// non-final b
// finished done
Expand Down
5 changes: 0 additions & 5 deletions packages/notifier/exports.js

This file was deleted.

1 change: 1 addition & 0 deletions packages/notifier/jsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"extends": "../../tsconfig.json",
"include": [
"src/**/*.js",
"*.js",
"test/**/*.js",
"tools/**/*.js",
],
Expand Down
12 changes: 9 additions & 3 deletions packages/notifier/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"@agoric/swingset-vat": "^0.30.2",
"@agoric/swing-store": "^0.8.1",
"@agoric/vat-data": "^0.4.3",
"@endo/eventual-send": "^0.16.8",
"@endo/far": "^0.2.14",
"@endo/marshal": "^0.8.1",
"@endo/promise-kit": "^0.2.52"
},
Expand All @@ -49,10 +49,16 @@
"ava": "^5.1.0",
"c8": "^7.12.0"
},
"exports": {
".": "./src/index.js",
"./exported.js": "./exported.js",
"./subscribe.js": "./subscribe.js",
"./tools/testSupports.js": "./tools/testSupports.js"
},
"files": [
"src/",
"tools",
"exported.js",
"tools/",
"*.js",
"NEWS.md"
],
"eslintIgnore": [
Expand Down
84 changes: 7 additions & 77 deletions packages/notifier/src/asyncIterableAdaptor.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/// <reference types="ses"/>

import { E } from '@endo/eventual-send';
import { Far } from '@endo/marshal';
import { E } from '@endo/far';
import { subscribeLatest } from './subscribe.js';

import './types-ambient.js';

/**
* @deprecated Use `subscribeLatest` from `@agoric/notifier/subscribe.js` instead.
*
* Adaptor from a notifierP to an async iterable.
* The notifierP can be any object that has an eventually invocable
* `getUpdateSince` method that behaves according to the notifier
Expand All @@ -32,61 +34,9 @@ import './types-ambient.js';
*
* @template T
* @param {ERef<BaseNotifier<T>>} notifierP
* @returns {ConsistentAsyncIterable<T>}
* @returns {ForkableAsyncIterable<T>}
*/
export const makeAsyncIterableFromNotifier = notifierP => {
return Far('asyncIterableFromNotifier', {
[Symbol.asyncIterator]: () => {
/** @type {UpdateCount} */
let localUpdateCount;
/** @type {Promise<{value: T, done: boolean}> | undefined} */
let myIterationResultP;
return Far('asyncIteratorFromNotifier', {
next: () => {
if (!myIterationResultP) {
// In this adaptor, once `next()` is called and returns an
// unresolved promise, `myIterationResultP`, and until
// `myIterationResultP` is fulfilled with an
// iteration result, further `next()` calls will return the same
// `myIterationResultP` promise again without asking the notifier
// for more updates. If there's already an unanswered ask in the
// air, all further asks should just reuse the result of that one.
//
// This reuse behavior is only needed for code that uses the async
// iterator protocol explicitly. When this async iterator is
// consumed by a for/await/of loop, `next()` will only be called
// after the promise for the previous iteration result has
// fulfilled. If it fulfills with `done: true`, the for/await/of
// loop will never call `next()` again.
//
// See
// https://2ality.com/2016/10/asynchronous-iteration.html#queuing-next()-invocations
// for an explicit use that sends `next()` without waiting.
myIterationResultP = E(notifierP)
.getUpdateSince(localUpdateCount)
.then(({ value, updateCount }) => {
localUpdateCount = updateCount;
const done = localUpdateCount === undefined;
if (!done) {
// Once the outstanding question has been answered, stop
// using that answer, so any further `next()` questions
// cause a new `getUpdateSince` request.
//
// But only if more answers are expected. Once the notifier
// is `done`, that was the last answer so reuse it forever.
myIterationResultP = undefined;
}
return harden({ value, done });
});
}
// xxx hint for type checker
assert(myIterationResultP);
return myIterationResultP;
},
});
},
});
};
export const makeAsyncIterableFromNotifier = subscribeLatest;

/**
* This advances `asyncIteratorP` updating `iterationObserver` with each
Expand Down Expand Up @@ -140,16 +90,6 @@ export const observeIteration = (asyncIterableP, iterationObserver) => {
return observeIterator(iteratorP, iterationObserver);
};

/**
* @deprecated Use `observeIteration` instead
* @template T
* @param {Partial<IterationObserver<T>>} iterationObserver
* @param {ERef<AsyncIterable<T>>} asyncIterableP
* @returns {Promise<undefined>}
*/
export const updateFromIterable = (iterationObserver, asyncIterableP) =>
observeIteration(asyncIterableP, iterationObserver);

/**
* As updates come in from the possibly remote `notifierP`, update
* the local `updater`. Since the updates come from a notifier, they
Expand All @@ -162,14 +102,4 @@ export const updateFromIterable = (iterationObserver, asyncIterableP) =>
* @returns {Promise<undefined>}
*/
export const observeNotifier = (notifierP, iterationObserver) =>
observeIteration(makeAsyncIterableFromNotifier(notifierP), iterationObserver);

/**
* @deprecated Use 'observeNotifier` instead.
* @template T
* @param {Partial<IterationObserver<T>>} iterationObserver
* @param {ERef<Notifier<T>>} notifierP
* @returns {Promise<undefined>}
*/
export const updateFromNotifier = (iterationObserver, notifierP) =>
observeIteration(makeAsyncIterableFromNotifier(notifierP), iterationObserver);
observeIteration(subscribeLatest(notifierP), iterationObserver);
Loading

0 comments on commit 1a66ff6

Please sign in to comment.