Skip to content

Commit

Permalink
refactor(cosmic-swingset): Prepare launch-chain.js for slow vat termi…
Browse files Browse the repository at this point in the history
…nation
  • Loading branch information
gibson042 committed Oct 29, 2024
1 parent bbb3e8c commit 5da6b90
Showing 1 changed file with 57 additions and 27 deletions.
84 changes: 57 additions & 27 deletions packages/cosmic-swingset/src/launch-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ const parseUpgradePlanInfo = (upgradePlan, prefix = '') => {
* `chainStorageEntries: [ ['c.o', '"top"'], ['c.o.i'], ['c.o.i.n', '42'], ['c.o.w', '"moo"'] ]`).
*/

/**
* @typedef {'leftover' | 'forced' | 'high-priority' | 'intermission' | 'queued'} CrankerPhase
* - leftover: work from a previous block
* - forced: work that claims the entirety of the current block
* - high-priority: queued work the precedes timer advancement
* - intermission: needed to note state exports and update consistency hashes
* - queued: queued work the follows timer advancement
*/

/**
* @typedef {(phase: CrankerPhase) => Promise<boolean>} Cranker runs the kernel
* and reports if it is time to stop
*/

/**
* Return the key in the reserved "host.*" section of the swing-store
*
Expand Down Expand Up @@ -259,25 +273,26 @@ export async function buildSwingset(
*/

/**
* @param {BeansPerUnit} beansPerUnit
* @param {object} params
* @param {BeansPerUnit} params.beansPerUnit
* @param {boolean} [ignoreBlockLimit]
* @returns {ChainRunPolicy}
*/
function computronCounter(
{
{ beansPerUnit },
ignoreBlockLimit = false,
) {
const {
[BeansPerBlockComputeLimit]: blockComputeLimit,
[BeansPerVatCreation]: vatCreation,
[BeansPerXsnapComputron]: xsnapComputron,
},
ignoreBlockLimit = false,
) {
} = beansPerUnit;
assert.typeof(blockComputeLimit, 'bigint');
assert.typeof(vatCreation, 'bigint');
assert.typeof(xsnapComputron, 'bigint');

let totalBeans = 0n;
const shouldRun = () => ignoreBlockLimit || totalBeans < blockComputeLimit;
const remainingBeans = () =>
ignoreBlockLimit ? undefined : blockComputeLimit - totalBeans;

const policy = harden({
vatCreated() {
Expand All @@ -303,11 +318,11 @@ function computronCounter(
emptyCrank() {
return shouldRun();
},

shouldRun,
remainingBeans,
totalBeans() {
return totalBeans;
},
remainingBeans: () =>
ignoreBlockLimit ? undefined : blockComputeLimit - totalBeans,
totalBeans: () => totalBeans,
});
return policy;
}
Expand Down Expand Up @@ -450,10 +465,11 @@ export async function launch({
/**
* @param {number} blockHeight
* @param {ChainRunPolicy} runPolicy
* @returns {Cranker}
*/
function makeRunSwingset(blockHeight, runPolicy) {
let runNum = 0;
async function runSwingset() {
async function runSwingset(_phase) {
const startBeans = runPolicy.totalBeans();
controller.writeSlogObject({
type: 'cosmic-swingset-run-start',
Expand Down Expand Up @@ -492,10 +508,10 @@ export async function launch({
timer.poll(blockTime);
// This is before the initial block, we need to finish processing the
// entire bootstrap before opening for business.
const runPolicy = computronCounter(params.beansPerUnit, true);
const runPolicy = computronCounter(params, true);
const runSwingset = makeRunSwingset(blockHeight, runPolicy);

await runSwingset();
await runSwingset('queued');
}

async function saveChainState() {
Expand Down Expand Up @@ -687,15 +703,16 @@ export async function launch({
* newly added, running the kernel to completion after each.
*
* @param {InboundQueue} inboundQueue
* @param {ReturnType<typeof makeRunSwingset>} runSwingset
* @param {Cranker} runSwingset
* @param {CrankerPhase} phase
*/
async function processActions(inboundQueue, runSwingset) {
async function processActions(inboundQueue, runSwingset, phase) {
let keepGoing = true;
for await (const { action, context } of inboundQueue.consumeAll()) {
const inboundNum = `${context.blockHeight}-${context.txHash}-${context.msgIdx}`;
inboundQueueMetrics.decStat();
await performAction(action, inboundNum);
keepGoing = await runSwingset();
keepGoing = await runSwingset(phase);
if (!keepGoing) {
// any leftover actions will remain on the inbound queue for possible
// processing in the next block
Expand All @@ -705,20 +722,32 @@ export async function launch({
return keepGoing;
}

async function runKernel(runSwingset, blockHeight, blockTime) {
/**
* Trigger the Swingset runs for this block, stopping when out of relevant
* work or when instructed to (whichever comes first).
*
* @param {Cranker} runSwingset
* @param {number} blockHeight
* @param {number} blockTime seconds since the POSIX epoch
*/
async function processBlockActions(runSwingset, blockHeight, blockTime) {
// First, complete leftover work, if any
let keepGoing = await runSwingset();
let keepGoing = await runSwingset('leftover');
if (!keepGoing) return;

// Then, if we have anything in the special runThisBlock queue, process
// it and do no further work.
if (runThisBlock.size()) {
await processActions(runThisBlock, runSwingset);
await processActions(runThisBlock, runSwingset, 'forced');
return;
}

// Then, process as much as we can from the priorityQueue.
keepGoing = await processActions(highPriorityQueue, runSwingset);
keepGoing = await processActions(
highPriorityQueue,
runSwingset,
'high-priority',
);
if (!keepGoing) return;

// Then, update the timer device with the new external time, which might
Expand All @@ -737,11 +766,11 @@ export async function launch({
// We must run the kernel even if nothing was added since the kernel
// only notes state exports and updates consistency hashes when attempting
// to perform a crank.
keepGoing = await runSwingset();
keepGoing = await runSwingset('intermission');
if (!keepGoing) return;

// Finally, process as much as we can from the actionQueue.
await processActions(actionQueue, runSwingset);
await processActions(actionQueue, runSwingset, 'queued');
}

async function endBlock(blockHeight, blockTime, params) {
Expand All @@ -759,11 +788,11 @@ export async function launch({
// It will also run to completion any work that swingset still had pending.
const neverStop = runThisBlock.size() > 0;

// make a runPolicy that will be shared across all cycles
const runPolicy = computronCounter(params.beansPerUnit, neverStop);
// Process the work for this block using a dedicated Cranker with a stateful
// run policy.
const runPolicy = computronCounter(params, neverStop);
const runSwingset = makeRunSwingset(blockHeight, runPolicy);

await runKernel(runSwingset, blockHeight, blockTime);
await processBlockActions(runSwingset, blockHeight, blockTime);

if (END_BLOCK_SPIN_MS) {
// Introduce a busy-wait to artificially put load on the chain.
Expand Down Expand Up @@ -950,6 +979,7 @@ export async function launch({
case ActionType.AG_COSMOS_INIT: {
allowExportCallback = true; // cleared by saveOutsideState in COMMIT_BLOCK
const { blockHeight, isBootstrap, upgradeDetails } = action;
// TODO: parseParams(action.params), for validation?

if (!blockNeedsExecution(blockHeight)) {
return true;
Expand Down

0 comments on commit 5da6b90

Please sign in to comment.