Skip to content

Commit

Permalink
feat(async-flow)!: require explicit wakeAll after flows redefined
Browse files Browse the repository at this point in the history
  • Loading branch information
mhofman committed Oct 3, 2024
1 parent d739b34 commit ee710ca
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 64 deletions.
16 changes: 7 additions & 9 deletions packages/async-flow/src/async-flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
getFailures() {
return failures.snapshot();
},
/**
* Must be called once on upgrade during the start crank, but after all
* async flows have been redefined
*/
wakeAll() {
wakeUpgradeGate();
// [...stuff.keys()] in order to snapshot before iterating
Expand All @@ -550,22 +554,16 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
}
},
getFlowForOutcomeVow(outcomeVow) {
return flowForOutcomeVowKey.get(toPassableCap(outcomeVow));
return /** @type {AsyncFlow} */ (
flowForOutcomeVowKey.get(toPassableCap(outcomeVow))
);
},
});

// Cannot call this until everything is prepared, so postpone to a later
// turn. (Ideally, we'd postpone to a later crank because prepares are
// allowed anytime in the first crank. But there's currently no pleasant
// way to postpone to a later crank.)
// See https://github.com/Agoric/agoric-sdk/issues/9377
const allWokenP = E.when(null, () => adminAsyncFlow.wakeAll());

return harden({
prepareAsyncFlowKit,
asyncFlow,
adminAsyncFlow,
allWokenP,
prepareEndowment,
});
};
Expand Down
2 changes: 2 additions & 0 deletions packages/async-flow/test/async-flow-early-completion.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ const testFirstPlay = async (t, zone) => {
};

const wrapperFunc = asyncFlow(zone, 'AsyncFlow1', guestMethod);
await adminAsyncFlow.wakeAll();

const outcomeV = zone.makeOnce('outcomeV', () => wrapperFunc(hOrch7, v1, v3));

Expand Down Expand Up @@ -175,6 +176,7 @@ const testBadShortReplay = async (t, zone, rejection) => {
// invoked, that would be a *new* activation with a new outcome and
// flow, and would have nothing to do with the existing one.
asyncFlow(zone, 'AsyncFlow1', guestMethod);
await adminAsyncFlow.wakeAll();

const outcomeV = /** @type {Vow} */ (
zone.makeOnce('outcomeV', () => Fail`need outcomeV`)
Expand Down
96 changes: 47 additions & 49 deletions packages/async-flow/test/async-flow-wake.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,55 +122,27 @@ testAsyncLife(
},
);

testAsyncLife.failing(
'failed wake',
async (t, { zone, asyncFlow, makeVowKit, allWokenP }) => {
t.plan(2);
// Spend a bunch of turns to pretend any concurrent async operation has settled
// Triggers https://github.com/Agoric/agoric-sdk/issues/9377
for (let i = 0; i < 100; i += 1) {
await null;
}

makeTestKit({ zone, makeVowKit });
const guestFunc = async w => {
t.pass('not triggered - invocation cannot be awoken');
return w.wait('foo');
};
t.notThrows(() =>
asyncFlow(zone, 'guestFunc', guestFunc, {
// Next incarnation should not start eager
startEager: false,
}),
);

return { allWokenP };
},
async (t, { allWokenP }) => {
await t.notThrowsAsync(
() => allWokenP,
'will actually throw due to crank bug #9377',
);
},
);

testAsyncLife(
'failed wake redo',
async (t, { zone, asyncFlow, makeVowKit }) => {
t.plan(2);
makeTestKit({ zone, makeVowKit });
const guestFunc = async w => {
t.pass();
return w.wait('foo');
};
t.notThrows(() =>
asyncFlow(zone, 'guestFunc', guestFunc, {
// Next incarnation should not start eager
startEager: false,
}),
);
},
);
testAsyncLife('failed wake', async (t, { zone, asyncFlow, makeVowKit }) => {
t.plan(2);
// Spend a bunch of turns to pretend any concurrent async operation has settled
// Previously the async-flows would be awoken one turn after preparation of the tools
// which would prevent asynchronous definitions.
for (let i = 0; i < 100; i += 1) {
await null;
}

makeTestKit({ zone, makeVowKit });
const guestFunc = async w => {
t.pass();
return w.wait('foo');
};
t.notThrows(() =>
asyncFlow(zone, 'guestFunc', guestFunc, {
// Next incarnation should not start eager
startEager: false,
}),
);
});

testAsyncLife(
'not eager waker stay sleeping 3',
Expand Down Expand Up @@ -217,3 +189,29 @@ testAsyncLife(
t.is(guestCalled.tripped, true, 'flow woke up');
},
);

testAsyncLife(
'no wakeAll causes start failure',
async (t, { zone, asyncFlow, makeVowKit }) => {
// Not reconnected handler invoked
t.plan(2);
makeTestKit({ zone, makeVowKit });
const guestFunc = async _ => t.fail(`Should not restart`);

t.notThrows(() => asyncFlow(zone, 'guestFunc', guestFunc));
},
undefined,
{
skipWakeAll: true,
notAllKindsReconnectedHandler: (e, t) => {
t.throws(
() => {
throw e;
},
{
message: 'defineDurableKind not called for tags: [WakeGateSentinel]',
},
);
},
},
);
4 changes: 4 additions & 0 deletions packages/async-flow/test/async-flow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const testFirstPlay = async (t, zone) => {
};

const wrapperFunc = asyncFlow(zone, 'AsyncFlow1', guestMethod);
await adminAsyncFlow.wakeAll();

const outcomeV = zone.makeOnce('outcomeV', () => wrapperFunc(hOrch7, v1, v3));

Expand Down Expand Up @@ -174,6 +175,7 @@ const testBadReplay = async (t, zone) => {
// invoked, that would be a *new* activation with a new outcome and
// flow, and would have nothing to do with the existing one.
asyncFlow(zone, 'AsyncFlow1', guestMethod);
await adminAsyncFlow.wakeAll();

const outcomeV = /** @type {Vow} */ (
zone.makeOnce('outcomeV', () => Fail`need outcomeV`)
Expand Down Expand Up @@ -258,6 +260,7 @@ const testGoodReplay = async (t, zone) => {
// invoked, that would be a *new* activation with a new outcome and
// flow, and would have nothing to do with the existing one.
asyncFlow(zone, 'AsyncFlow1', guestMethod);
await adminAsyncFlow.wakeAll();

const outcomeV = /** @type {Vow} */ (
zone.makeOnce('outcomeV', () => Fail`need outcomeV`)
Expand Down Expand Up @@ -329,6 +332,7 @@ const testAfterPlay = async (t, zone) => {
// invoked, that would be a *new* activation with a new outcome and
// flow, and would have nothing to do with the existing one.
asyncFlow(zone, 'AsyncFlow1', guestMethod);
await adminAsyncFlow.wakeAll();

const outcomeV = /** @type {Vow} */ (
zone.makeOnce('outcomeV', () => Fail`need outcomeV`)
Expand Down
2 changes: 2 additions & 0 deletions packages/async-flow/test/bad-host.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const testBadHostFirstPlay = async (t, zone) => {
};

const wrapperFunc = asyncFlow(zone, 'AsyncFlow1', guestMethod);
await adminAsyncFlow.wakeAll();

const badHost = zone.makeOnce('badHost', () => makeBadHost());

Expand Down Expand Up @@ -166,6 +167,7 @@ const testBadHostReplay1 = async (t, zone) => {
};

asyncFlow(zone, 'AsyncFlow1', guestMethod);
await adminAsyncFlow.wakeAll();

const badHost = zone.makeOnce('badHost', () => Fail`need badHost`);

Expand Down
5 changes: 5 additions & 0 deletions packages/async-flow/test/prepare-test-env-ava.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export const asyncFlowVerbose = () => {
* @typedef {Omit<Parameters<typeof startLife>[2], 'notAllKindsReconnectedHandler'> & {
* notAllKindsReconnectedHandler?: (e: Error, t: ExecutionContext) => void;
* panicHandler?: import('./_utils.js').TestAsyncFlowPanicHandler;
* skipWakeAll?: boolean;
* }} StartAsyncLifeOptions
*/

Expand All @@ -43,6 +44,7 @@ export const startAsyncLife = async (
run,
{
panicHandler,
skipWakeAll,
notAllKindsReconnectedHandler: notAllKindsReconnectedHandlerOriginal,
...startOptions
} = {},
Expand All @@ -59,6 +61,9 @@ export const startAsyncLife = async (
zone: rootZone.subZone('contract'),
...asyncFlowTools,
});
if (!skipWakeAll) {
await asyncFlowTools.adminAsyncFlow.wakeAll();
}
return tools;
},
run,
Expand Down
4 changes: 3 additions & 1 deletion packages/orchestration/src/utils/start-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ export const withOrchestration =
privateArgs,
privateArgs.marshaller,
);
return contractFn(zcf, privateArgs, zone, tools);
const result = await contractFn(zcf, privateArgs, zone, tools);
await tools.asyncFlowTools.adminAsyncFlow.wakeAll();
return result;
};
harden(withOrchestration);
6 changes: 6 additions & 0 deletions packages/orchestration/test/facade-durability.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ test.serial('chain info', async t => {
return orc.getChain('mock');
});

await orchKit.asyncFlowTools.adminAsyncFlow.wakeAll();

const result = (await vt.when(handle())) as Chain<any>;
t.deepEqual(await vt.when(result.getChainInfo()), mockChainInfo);
});
Expand Down Expand Up @@ -122,6 +124,8 @@ test.serial('faulty chain info', async t => {
return account;
});

await orchKit.asyncFlowTools.adminAsyncFlow.wakeAll();

await t.throwsAsync(vt.when(handle()), {
message: 'chain info lacks staking denom',
});
Expand Down Expand Up @@ -209,6 +213,8 @@ test.serial('asset / denom info', async t => {
},
);

await orchKit.asyncFlowTools.adminAsyncFlow.wakeAll();

await vt.when(handle());

chainHub.registerChain('anotherChain', mockChainInfo);
Expand Down
20 changes: 15 additions & 5 deletions packages/orchestration/test/facade.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @jessie.js/safe-await-separator */
import { test as anyTest } from '@agoric/zoe/tools/prepare-test-env-ava.js';

import type { VowTools } from '@agoric/vow';
Expand All @@ -10,7 +9,12 @@ import type { OrchestrationFlow } from '../src/orchestration-api.js';
import { provideOrchestration } from '../src/utils/start-helper.js';
import { commonSetup } from './supports.js';

const test = anyTest as TestFn<{ vt: VowTools; orchestrateAll: any; zcf: ZCF }>;
const test = anyTest as TestFn<{
vt: VowTools;
orchestrateAll: any;
zcf: ZCF;
wakeAll: () => Promise<void>;
}>;

test.beforeEach(async t => {
const { facadeServices, commonPrivateArgs } = await commonSetup(t);
Expand All @@ -32,11 +36,13 @@ test.beforeEach(async t => {
);

const { orchestrateAll } = orchKit;
t.context = { vt, orchestrateAll, zcf };
const wakeAll = async () => orchKit.asyncFlowTools.adminAsyncFlow.wakeAll();

t.context = { vt, orchestrateAll, zcf, wakeAll };
});

test('calls between flows', async t => {
const { vt, orchestrateAll, zcf } = t.context;
const { vt, orchestrateAll, zcf, wakeAll } = t.context;

const flows = {
outer(orch, ctx, ...recipients) {
Expand All @@ -51,12 +57,14 @@ test('calls between flows', async t => {
peerFlows: flows,
});

await wakeAll();

t.deepEqual(await vt.when(inner('a', 'b', 'c')), 'a b c');
t.deepEqual(await vt.when(outer('a', 'b', 'c')), 'Hello a b c');
});

test('context mapping individual flows', async t => {
const { vt, orchestrateAll, zcf } = t.context;
const { vt, orchestrateAll, zcf, wakeAll } = t.context;

const flows = {
outer(orch, ctx, ...recipients) {
Expand All @@ -71,5 +79,7 @@ test('context mapping individual flows', async t => {
peerFlows: { inner: flows.inner },
});

await wakeAll();

t.deepEqual(await vt.when(outer('a', 'b', 'c')), 'Hello a b c');
});

0 comments on commit ee710ca

Please sign in to comment.