From 251f061f3ec64f9a7240cacdc00329c06b68f190 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 14:24:41 +0000 Subject: [PATCH] forcefulShutdown can be successful; channel errors separately --- src/index.ts | 4 +- src/interfaces.ts | 4 +- src/main.ts | 192 ++++++++++++++++++++++++++++++++-------------- 3 files changed, 138 insertions(+), 62 deletions(-) diff --git a/src/index.ts b/src/index.ts index 76628deb..641b018e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -372,11 +372,11 @@ declare global { poolGracefulShutdown( event: GraphileWorker.PoolGracefulShutdownEvent, - ): Promise; + ): ReturnType; poolForcefulShutdown( event: GraphileWorker.PoolForcefulShutdownEvent, - ): Promise; + ): ReturnType; } interface WorkerHooks { diff --git a/src/interfaces.ts b/src/interfaces.ts index e9dbea50..558d771d 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -517,7 +517,9 @@ export interface WorkerPool { /** @deprecated Use gracefulShutdown instead */ release: () => PromiseOrDirect; gracefulShutdown: (message?: string) => PromiseOrDirect; - forcefulShutdown: (message: string) => PromiseOrDirect; + forcefulShutdown: (message: string) => PromiseOrDirect<{ + forceFailedJobs: readonly Job[]; + }>; promise: Promise; /** Fires 'abort' when all running jobs should stop because worker is shutting down. @experimental */ abortSignal: AbortSignal; diff --git a/src/main.ts b/src/main.ts index 50a704e6..3bc28be9 100644 --- a/src/main.ts +++ b/src/main.ts @@ -589,6 +589,8 @@ export function _runTaskList( unregisterSignalHandlers = registerSignalHandlers(logger, events); } + /* Errors that should be raised from the workerPool.promise (i.e. _finPromise) */ + const _finErrors: Error[] = []; const _finPromise = defer(); let deactivatePromise: Promise | null = null; @@ -630,10 +632,13 @@ export function _runTaskList( } let terminated = false; - async function terminate(error?: Error) { + async function terminate() { if (!terminated) { terminated = true; + /* Errors that should be raised from terminate() itself */ + const terminateErrors: Error[] = []; + const releaseCompleteJobPromise = releaseCompleteJob?.(); const releaseFailJobPromise = releaseFailJob?.(); const [releaseCompleteJobResult, releaseFailJobResult] = @@ -641,9 +646,10 @@ export function _runTaskList( releaseCompleteJobPromise, releaseFailJobPromise, ]); - const errors: Error[] = error ? [error] : []; if (releaseCompleteJobResult.status === "rejected") { - errors.push(coerceError(releaseCompleteJobResult.reason)); + const error = coerceError(releaseCompleteJobResult.reason); + _finErrors.push(error); + terminateErrors.push(error); // Log but continue regardless logger.error( `Releasing complete job batcher failed: ${releaseCompleteJobResult.reason}`, @@ -653,7 +659,9 @@ export function _runTaskList( ); } if (releaseFailJobResult.status === "rejected") { - errors.push(coerceError(releaseFailJobResult.reason)); + const error = coerceError(releaseFailJobResult.reason); + _finErrors.push(error); + terminateErrors.push(error); // Log but continue regardless logger.error( `Releasing failed job batcher failed: ${releaseFailJobResult.reason}`, @@ -669,14 +677,15 @@ export function _runTaskList( try { await onTerminate?.(); } catch (e) { - errors.push(coerceError(e)); + _finErrors.push(coerceError(e)); + terminateErrors.push(coerceError(e)); } - if (errors.length === 1) { - throw errors[0]; - } else if (errors.length > 1) { + if (terminateErrors.length === 1) { + throw terminateErrors[0]; + } else if (terminateErrors.length > 1) { throw new AggregateError( - errors, + terminateErrors, "Errors occurred whilst terminating queue", ); } @@ -701,8 +710,12 @@ export function _runTaskList( // Make sure Node doesn't get upset about unhandled rejection abortPromise.then(null, () => /* noop */ void 0); - let gracefulShutdownPromise: Promise | null = null; - let forcefulShutdownPromise: Promise | null = null; + let gracefulShutdownPromise: ReturnType< + WorkerPool["gracefulShutdown"] + > | null = null; + let forcefulShutdownPromise: ReturnType< + WorkerPool["forcefulShutdown"] + > | null = null; let finished = false; const finWithError = (e: unknown) => { @@ -710,9 +723,13 @@ export function _runTaskList( return; } finished = true; - const error = e != null ? coerceError(e) : null; - if (error) { - _finPromise.reject(error); + if (e != null) { + _finErrors.push(coerceError(e)); + } + if (_finErrors.length === 1) { + _finPromise.reject(_finErrors[0]); + } else if (_finErrors.length > 1) { + _finPromise.reject(new AggregateError(_finErrors)); } else { _finPromise.resolve(); } @@ -758,18 +775,20 @@ export function _runTaskList( * progress to complete. */ gracefulShutdown(message = "Worker pool is shutting down gracefully") { - if (workerPool._forcefulShuttingDown) { - logger.error( - `gracefulShutdown called when forcefulShutdown is already in progress`, - ); - return forcefulShutdownPromise!; - } if (workerPool._shuttingDown) { logger.error( `gracefulShutdown called when gracefulShutdown is already in progress`, ); return gracefulShutdownPromise!; } + if (workerPool._forcefulShuttingDown) { + logger.error( + `gracefulShutdown called when forcefulShutdown is already in progress`, + ); + return Promise.resolve(forcefulShutdownPromise).then(() => { + throw new Error("Forceful shutdown already initiated"); + }); + } workerPool._shuttingDown = true; gracefulShutdownPromise = middleware.run( @@ -786,7 +805,7 @@ export function _runTaskList( // Stop new jobs being added const deactivatePromise = deactivate(); - const errors: Error[] = []; + const gracefulShutdownErrors: Error[] = []; // Remove all the workers - we're shutting them down manually const workers = [...workerPool._workers]; @@ -794,7 +813,9 @@ export function _runTaskList( const [deactivateResult, ...workerReleaseResults] = await Promise.allSettled([deactivatePromise, ...workerPromises]); if (deactivateResult.status === "rejected") { - errors.push(coerceError(deactivateResult.reason)); + const error = coerceError(deactivateResult.reason); + _finErrors.push(error); + gracefulShutdownErrors.push(error); // Log but continue regardless logger.error(`Deactivation failed: ${deactivateResult.reason}`, { error: deactivateResult.reason, @@ -851,18 +872,19 @@ export function _runTaskList( } if (this._forcefulShuttingDown) { - errors.push( + // Do _not_ add to _finErrors + gracefulShutdownErrors.push( new Error( "forcefulShutdown was initiated whilst gracefulShutdown was still executing.", ), ); } - if (errors.length === 1) { - throw errors[0]; - } else if (errors.length > 1) { + if (gracefulShutdownErrors.length === 1) { + throw gracefulShutdownErrors[0]; + } else if (gracefulShutdownErrors.length > 1) { throw new AggregateError( - errors, + gracefulShutdownErrors, "Errors occurred whilst shutting down worker", ); } @@ -883,13 +905,17 @@ export function _runTaskList( `Error occurred during graceful shutdown: ${message}`, { error: e }, ); - // NOTE: we now rely on forcefulShutdown to handle terminate() - if (this._forcefulShuttingDown) { + + const forcefulPromise = // Skip the warning about double shutdown - return forcefulShutdownPromise!; - } else { - return this.forcefulShutdown(message); - } + this._forcefulShuttingDown + ? forcefulShutdownPromise! + : this.forcefulShutdown(message); + + // NOTE: we now rely on forcefulShutdown to handle terminate() + return Promise.resolve(forcefulPromise).then(() => { + throw e; + }); } if (!terminated) { await terminate(); @@ -897,7 +923,7 @@ export function _runTaskList( }, ); - gracefulShutdownPromise.then(fin, finWithError); + Promise.resolve(gracefulShutdownPromise).then(fin, finWithError); const abortTimer = setTimeout(() => { abortController.abort(); @@ -938,18 +964,16 @@ export function _runTaskList( t.unref(); }); + const wasAlreadyDeactivating = deactivatePromise != null; // Stop new jobs being added // NOTE: deactivate() immediately stops getJob working, even if the // promise takes a while to resolve. - const deactivatePromise = Promise.race([deactivate(), timeout]); + const deactiveateOrTimeout = Promise.race([deactivate(), timeout]); - const errors: Error[] = []; + const forcefulShutdownErrors: Error[] = []; // Release all our workers' jobs const workers = [...workerPool._workers]; - const jobsInProgress: Array = workers - .map((worker) => worker.getActiveJob()) - .filter((job): job is Job => !!job); // Remove all the workers - we're shutting them down manually const workerPromises = workers.map((worker) => @@ -959,24 +983,54 @@ export function _runTaskList( Promise.race([worker.release(true), timeout]), ); // Ignore the results, we're shutting down anyway - const [deactivateResult, ..._ignoreWorkerReleaseResults] = - await Promise.allSettled([deactivatePromise, ...workerPromises]); + const [deactivateResult, ...workerReleaseResults] = + await Promise.allSettled([ + deactiveateOrTimeout, + ...workerPromises, + ]); if (deactivateResult.status === "rejected") { // Log but continue regardless logger.error(`Deactivation failed: ${deactivateResult.reason}`, { error: deactivateResult.reason, }); - errors.push(coerceError(deactivateResult.reason)); + const error = coerceError(deactivateResult.reason); + if (!wasAlreadyDeactivating) { + // Add this to _finErrors unless it's already there + _finErrors.push(error); + } + forcefulShutdownErrors.push(error); } - if (jobsInProgress.length > 0) { + const workerProblems = workers + .map((worker, i) => { + const result = workerReleaseResults[i]; + const activeJob = worker.getActiveJob(); + if (result.status === "rejected") { + return [ + worker, + coerceError(result.reason), + activeJob, + ] as const; + } else if (activeJob) { + return [worker, null, activeJob] as const; + } else { + return null; + } + }) + .filter((t: T | null): t is T => t != null); + + const forceFailedJobs = workerProblems + .map(([, , job]) => job) + .filter((job): job is Job => !!job); + + if (forceFailedJobs.length > 0) { const workerIds = workers.map((worker) => worker.workerId); logger.debug( - `Releasing the jobs ${jobsInProgress + `Releasing the jobs ${forceFailedJobs .map((j) => j.id) .join()} (workers: ${workerIds.join(", ")})`, { - jobs: jobsInProgress, + jobs: forceFailedJobs, workerIds, }, ); @@ -985,24 +1039,44 @@ export function _runTaskList( compiledSharedOptions, withPgClient, workerPool.id, - jobsInProgress, + forceFailedJobs, message, ); + logger.debug(`Cancelled ${cancelledJobs.length} jobs`, { cancelledJobs, }); } catch (e) { - errors.push(coerceError(e)); + const error = coerceError(e); + _finErrors.push(error); + forcefulShutdownErrors.push(error); } } else { logger.debug("No active jobs to release"); } - if (errors.length === 1) { - throw errors[0]; - } else if (errors.length > 1) { + for (const [worker, error, job] of workerProblems) { + // These are not a failure of forcefulShutdown, so do not go into + // forcefulShutdownErrors. + _finErrors.push( + new Error( + `Worker ${worker.workerId} ${ + job ? `with active job ${job.id}` : "" + } ${ + error + ? `failed to release, error: ${error})` + : `failed to stop working` + }`, + { cause: error }, + ), + ); + } + + if (forcefulShutdownErrors.length === 1) { + throw forcefulShutdownErrors[0]; + } else if (forcefulShutdownErrors.length > 1) { throw new AggregateError( - errors, + forcefulShutdownErrors, "Errors occurred whilst forcefully shutting down worker", ); } @@ -1012,6 +1086,9 @@ export function _runTaskList( workerPool, }); logger.debug("Forceful shutdown complete"); + return { + forceFailedJobs, + }; } catch (e) { events.emit("pool:forcefulShutdown:error", { pool: workerPool, @@ -1019,24 +1096,21 @@ export function _runTaskList( error: e, }); const error = coerceError(e); + _finErrors.push(error); logger.error( `Error occurred during forceful shutdown: ${error.message}`, { error: e }, ); + throw error; + } finally { if (!terminated) { - // Guaranteed to throw - await terminate(error); + await terminate(); } - throw error; - } - if (!terminated) { - // Guaranteed to throw - await terminate(new Error("Forceful shutdown")); } }, ); - forcefulShutdownPromise.then(fin, finWithError); + Promise.resolve(forcefulShutdownPromise).then(fin, finWithError); return forcefulShutdownPromise; },