Skip to content

Commit

Permalink
Merge pull request #7145 from Agoric/gibson-2023-03-kernel-event-queues
Browse files Browse the repository at this point in the history
chore(SwingSet): Consolidate logic for picking a queued event
  • Loading branch information
mergify[bot] authored Apr 10, 2023
2 parents 3bc27ff + 9b4f596 commit 5fcfe1a
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 75 deletions.
38 changes: 33 additions & 5 deletions packages/SwingSet/src/kernel/gc-actions.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
import { assert } from '@agoric/assert';
import { Fail } from '@agoric/assert';
import { insistKernelType } from './parseKernelSlots.js';
import { insistVatID } from '../lib/id.js';

const typePriority = ['dropExport', 'retireExport', 'retireImport'];
/**
* @typedef {'dropExport' | 'retireExport' | 'retireImport'} GCActionType
* @typedef {'dropExports' | 'retireExports' | 'retireImports'} GCQueueEventType
*/

/**
* The list of GC action types by descending priority.
*
* @type {GCActionType[]}
*/
const actionTypePriorities = ['dropExport', 'retireExport', 'retireImport'];

/**
* A mapping of GC action type to queue event type.
*
* @type {Map<GCActionType, GCQueueEventType>}
*/
const queueTypeFromActionType = new Map([
['dropExport', 'dropExports'],
['retireExport', 'retireExports'],
['retireImport', 'retireImports'],
]);

function parseAction(s) {
const [vatID, type, kref] = s.split(' ');
insistVatID(vatID);
assert(typePriority.includes(type), `unknown type ${type}`);
queueTypeFromActionType.has(type) || Fail`unknown type ${type}`;
insistKernelType('object', kref);
return { vatID, type, kref };
}

/**
* @param {*} kernelKeeper
* @returns {import('../types-internal.js').RunQueueEvent | undefined}
*/
export function processGCActionSet(kernelKeeper) {
const allActionsSet = kernelKeeper.getGCActions();
let actionSetUpdated = false;
Expand Down Expand Up @@ -114,7 +139,7 @@ export function processGCActionSet(kernelKeeper) {
for (const vatID of vatIDs) {
const forVat = grouped.get(vatID);
// find the highest-priority type of work to do within this vat
for (const type of typePriority) {
for (const type of actionTypePriorities) {
if (forVat.has(type)) {
const actions = forVat.get(type);
const krefs = filterActions(vatID, actions);
Expand All @@ -123,7 +148,10 @@ export function processGCActionSet(kernelKeeper) {
krefs.sort();
// remove the work we're about to do from the durable set
kernelKeeper.setGCActions(allActionsSet);
return harden({ type: `${type}s`, vatID, krefs });
const queueType = /** @type {GCQueueEventType} */ (
queueTypeFromActionType.get(type)
);
return harden({ type: queueType, vatID, krefs });
}
}
}
Expand Down
93 changes: 40 additions & 53 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -1116,28 +1116,18 @@ export default function buildKernel(
* @typedef { import('../types-internal.js').VatID } VatID
* @typedef { import('../types-internal.js').InternalDynamicVatOptions } InternalDynamicVatOptions
*
* @typedef { { type: 'notify', vatID: VatID, kpid: string } } RunQueueEventNotify
* @typedef { { type: 'send', target: string, msg: Message }} RunQueueEventSend
* @typedef { { type: 'create-vat', vatID: VatID,
* source: { bundle: Bundle } | { bundleID: BundleID },
* vatParameters: SwingSetCapData,
* dynamicOptions: InternalDynamicVatOptions }
* } RunQueueEventCreateVat
* @typedef { { type: 'upgrade-vat', vatID: VatID, upgradeID: string,
* bundleID: BundleID, vatParameters: SwingSetCapData,
* upgradeMessage: string } } RunQueueEventUpgradeVat
* @typedef { { type: 'changeVatOptions', vatID: VatID, options: Record<string, unknown> } } RunQueueEventChangeVatOptions
* @typedef { { type: 'startVat', vatID: VatID, vatParameters: SwingSetCapData } } RunQueueEventStartVat
* @typedef { { type: 'dropExports', vatID: VatID, krefs: string[] } } RunQueueEventDropExports
* @typedef { { type: 'retireExports', vatID: VatID, krefs: string[] } } RunQueueEventRetireExports
* @typedef { { type: 'retireImports', vatID: VatID, krefs: string[] } } RunQueueEventRetireImports
* @typedef { { type: 'negated-gc-action', vatID: VatID } } RunQueueEventNegatedGCAction
* @typedef { { type: 'bringOutYourDead', vatID: VatID } } RunQueueEventBringOutYourDead
* @typedef { RunQueueEventNotify | RunQueueEventSend | RunQueueEventCreateVat |
* RunQueueEventUpgradeVat | RunQueueEventChangeVatOptions | RunQueueEventStartVat |
* RunQueueEventDropExports | RunQueueEventRetireExports | RunQueueEventRetireImports |
* RunQueueEventNegatedGCAction | RunQueueEventBringOutYourDead
* } RunQueueEvent
* @typedef { import('../types-internal.js').RunQueueEventNotify } RunQueueEventNotify
* @typedef { import('../types-internal.js').RunQueueEventSend } RunQueueEventSend
* @typedef { import('../types-internal.js').RunQueueEventCreateVat } RunQueueEventCreateVat
* @typedef { import('../types-internal.js').RunQueueEventUpgradeVat } RunQueueEventUpgradeVat
* @typedef { import('../types-internal.js').RunQueueEventChangeVatOptions } RunQueueEventChangeVatOptions
* @typedef { import('../types-internal.js').RunQueueEventStartVat } RunQueueEventStartVat
* @typedef { import('../types-internal.js').RunQueueEventDropExports } RunQueueEventDropExports
* @typedef { import('../types-internal.js').RunQueueEventRetireExports } RunQueueEventRetireExports
* @typedef { import('../types-internal.js').RunQueueEventRetireImports } RunQueueEventRetireImports
* @typedef { import('../types-internal.js').RunQueueEventNegatedGCAction } RunQueueEventNegatedGCAction
* @typedef { import('../types-internal.js').RunQueueEventBringOutYourDead } RunQueueEventBringOutYourDead
* @typedef { import('../types-internal.js').RunQueueEvent } RunQueueEvent
*/

/**
Expand Down Expand Up @@ -1216,6 +1206,10 @@ export default function buildKernel(
return results;
}

/**
* @param {RunQueueEvent} message
* @returns {Promise<PolicyInput>}
*/
async function processDeliveryMessage(message) {
kdebug('');
kdebug(`processQ ${JSON.stringify(message)}`);
Expand Down Expand Up @@ -1369,6 +1363,10 @@ export default function buildKernel(
}
}

/**
* @param {RunQueueEvent} message
* @returns {Promise<PolicyInput>}
*/
async function processAcceptanceMessage(message) {
kdebug('');
kdebug(`processAcceptanceQ ${JSON.stringify(message)}`);
Expand Down Expand Up @@ -1707,39 +1705,28 @@ export default function buildKernel(
}
}

function getNextDeliveryMessage() {
const gcMessage = processGCActionSet(kernelKeeper);
if (gcMessage) {
return gcMessage;
}
const reapMessage = kernelKeeper.nextReapAction();
if (reapMessage) {
return reapMessage;
}

if (!kernelKeeper.isRunQueueEmpty()) {
return kernelKeeper.getNextRunQueueMsg();
}
return undefined;
}

function getNextAcceptanceMessage() {
if (!kernelKeeper.isAcceptanceQueueEmpty()) {
return kernelKeeper.getNextAcceptanceQueueMsg();
}
return undefined;
}

/**
* Pulls the next message from the highest-priority queue and returns it
* along with a corresponding processor.
*
* @returns {{
* message: RunQueueEvent | undefined,
* processor: (message: RunQueueEvent) => Promise<PolicyInput>,
* }}
*/
function getNextMessageAndProcessor() {
let message = getNextAcceptanceMessage();
/** @type {(message:any) => Promise<PolicyInput>} */
let processor = processAcceptanceMessage;
if (!message) {
message = getNextDeliveryMessage();
processor = processDeliveryMessage;
const acceptanceMessage = kernelKeeper.getNextAcceptanceQueueMsg();
if (acceptanceMessage) {
return {
message: acceptanceMessage,
processor: processAcceptanceMessage,
};
}

return { message, processor };
const message =
processGCActionSet(kernelKeeper) ||
kernelKeeper.nextReapAction() ||
kernelKeeper.getNextRunQueueMsg();
return { message, processor: processDeliveryMessage };
}

function changeKernelOptions(options) {
Expand Down
10 changes: 0 additions & 10 deletions packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -958,10 +958,6 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
enqueue('runQueue', msg);
}

function isRunQueueEmpty() {
return queueLength('runQueue') <= 0;
}

function getRunQueueLength() {
return queueLength('runQueue');
}
Expand All @@ -974,10 +970,6 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
enqueue('acceptanceQueue', msg);
}

function isAcceptanceQueueEmpty() {
return queueLength('acceptanceQueue') <= 0;
}

function getAcceptanceQueueLength() {
return queueLength('acceptanceQueue');
}
Expand Down Expand Up @@ -1605,12 +1597,10 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
enumerateNonDurableObjectExports,

addToRunQueue,
isRunQueueEmpty,
getRunQueueLength,
getNextRunQueueMsg,

addToAcceptanceQueue,
isAcceptanceQueueEmpty,
getAcceptanceQueueLength,
getNextAcceptanceQueueMsg,

Expand Down
25 changes: 25 additions & 0 deletions packages/SwingSet/src/types-internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,28 @@ export {};
*
* @typedef {(problem: unknown, err?: Error) => void } KernelPanic
*/

/**
* @typedef { { type: 'notify', vatID: VatID, kpid: string } } RunQueueEventNotify
* @typedef { { type: 'send', target: string, msg: Message }} RunQueueEventSend
* @typedef { { type: 'create-vat', vatID: VatID,
* source: { bundle: Bundle } | { bundleID: BundleID },
* vatParameters: SwingSetCapData,
* dynamicOptions: InternalDynamicVatOptions }
* } RunQueueEventCreateVat
* @typedef { { type: 'upgrade-vat', vatID: VatID, upgradeID: string,
* bundleID: BundleID, vatParameters: SwingSetCapData,
* upgradeMessage: string } } RunQueueEventUpgradeVat
* @typedef { { type: 'changeVatOptions', vatID: VatID, options: Record<string, unknown> } } RunQueueEventChangeVatOptions
* @typedef { { type: 'startVat', vatID: VatID, vatParameters: SwingSetCapData } } RunQueueEventStartVat
* @typedef { { type: 'dropExports', vatID: VatID, krefs: string[] } } RunQueueEventDropExports
* @typedef { { type: 'retireExports', vatID: VatID, krefs: string[] } } RunQueueEventRetireExports
* @typedef { { type: 'retireImports', vatID: VatID, krefs: string[] } } RunQueueEventRetireImports
* @typedef { { type: 'negated-gc-action', vatID?: VatID } } RunQueueEventNegatedGCAction
* @typedef { { type: 'bringOutYourDead', vatID: VatID } } RunQueueEventBringOutYourDead
* @typedef { RunQueueEventNotify | RunQueueEventSend | RunQueueEventCreateVat |
* RunQueueEventUpgradeVat | RunQueueEventChangeVatOptions | RunQueueEventStartVat |
* RunQueueEventDropExports | RunQueueEventRetireExports | RunQueueEventRetireImports |
* RunQueueEventNegatedGCAction | RunQueueEventBringOutYourDead
* } RunQueueEvent
*/
10 changes: 3 additions & 7 deletions packages/SwingSet/test/test-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -308,41 +308,37 @@ test('kernelKeeper runQueue', async t => {
const k = makeKernelKeeper(store, null);
k.createStartingKernelState({ defaultManagerType: 'local' });

t.truthy(k.isRunQueueEmpty());
t.is(k.getRunQueueLength(), 0);
t.is(k.getNextRunQueueMsg(), undefined);

k.addToRunQueue({ type: 'send', stuff: 'awesome' });
t.falsy(k.isRunQueueEmpty());
t.is(k.getRunQueueLength(), 1);

k.addToRunQueue({ type: 'notify', stuff: 'notifawesome' });
t.falsy(k.isRunQueueEmpty());
t.is(k.getRunQueueLength(), 2);

k.emitCrankHashes();
const k2 = duplicateKeeper(store.serialize);

t.deepEqual(k.getNextRunQueueMsg(), { type: 'send', stuff: 'awesome' });
t.falsy(k.isRunQueueEmpty());
t.is(k.getRunQueueLength(), 1);

t.deepEqual(k.getNextRunQueueMsg(), {
type: 'notify',
stuff: 'notifawesome',
});
t.truthy(k.isRunQueueEmpty());
t.is(k.getRunQueueLength(), 0);
t.is(k.getNextRunQueueMsg(), undefined);

t.deepEqual(k2.getNextRunQueueMsg(), { type: 'send', stuff: 'awesome' });
t.falsy(k2.isRunQueueEmpty());
t.is(k2.getRunQueueLength(), 1);

t.deepEqual(k2.getNextRunQueueMsg(), {
type: 'notify',
stuff: 'notifawesome',
});
t.truthy(k2.isRunQueueEmpty());
t.is(k2.getRunQueueLength(), 0);
t.is(k2.getNextRunQueueMsg(), undefined);
});

test('kernelKeeper promises', async t => {
Expand Down

0 comments on commit 5fcfe1a

Please sign in to comment.