diff --git a/packages/backend/src/errors/retriable-error.ts b/packages/backend/src/errors/retriable-error.ts index 118fa59d7..61d1da965 100644 --- a/packages/backend/src/errors/retriable-error.ts +++ b/packages/backend/src/errors/retriable-error.ts @@ -14,13 +14,13 @@ interface RetriableErrorParams { * * The `delayType` param allows for configuring how we should delay before * retrying: - * - `queue`: The step is re-queued, and the ENTIRE queue is paused until the + * - `queue`: The step is re-queued, and the ENTIRE queue is delayed until the * delay period is over. This is generally used in per-app queues * (e.g. the app requires us to pause all API calls if they return a * 429). This will not work if the step came from the default action * queue. * - `group`: The step is re-queued, and the group associated with the step is - * paused until the delay period is over. Generally used in per-app + * delayed until the delay period is over. Generally used in per-app * queues (e.g. the app rate limits by connection ID, and the group * ID is set to the connection ID). * - `step`: The step is re-queued with delay set to the delay period. The other diff --git a/packages/backend/src/helpers/__tests__/actions.test.ts b/packages/backend/src/helpers/__tests__/actions.test.ts index e71e65d4d..c3c529b43 100644 --- a/packages/backend/src/helpers/__tests__/actions.test.ts +++ b/packages/backend/src/helpers/__tests__/actions.test.ts @@ -24,7 +24,7 @@ const mocks = vi.hoisted(() => ({ })) const MOCK_CONTEXT = { - isQueuePausable: false, + isQueueDelayable: false, span: { addTags: mocks.addSpanTags, } as unknown as Span, @@ -240,7 +240,7 @@ describe('action helper functions', () => { }) describe('RetriableError handling', () => { - it('pauses the queue if delayType is queue and the queue is pausable', () => { + it('delays the queue if delayType is queue and the queue is delayable', () => { try { handleFailedStepAndThrow({ errorDetails: {}, @@ -251,7 +251,7 @@ describe('action helper functions', () => { }), context: { ...MOCK_CONTEXT, - isQueuePausable: true, + isQueueDelayable: true, }, }) } catch (e) { @@ -263,7 +263,7 @@ describe('action helper functions', () => { } }) - it('does not pause the queue if delayType is queue but queue is not pausable', () => { + it('does not delay the queue if delayType is queue but queue is not delayable', () => { expect(() => handleFailedStepAndThrow({ errorDetails: {}, @@ -280,7 +280,7 @@ describe('action helper functions', () => { expect(mocks.workerRateLimitGroup).not.toBeCalled() }) - it("pauses the job's group if delayType is group", () => { + it("delays the job's group if delayType is group", () => { const job = { opts: { group: { @@ -311,7 +311,7 @@ describe('action helper functions', () => { } }) - it('does not pause the group if delayType is group but job does not have a group', () => { + it('does not delay the group if delayType is group but job does not have a group', () => { expect(() => handleFailedStepAndThrow({ errorDetails: {}, diff --git a/packages/backend/src/helpers/__tests__/queues.test.ts b/packages/backend/src/helpers/__tests__/queues.test.ts index a0c744d00..bd3878d3e 100644 --- a/packages/backend/src/helpers/__tests__/queues.test.ts +++ b/packages/backend/src/helpers/__tests__/queues.test.ts @@ -88,7 +88,7 @@ describe('Queue helper functions', () => { it('creates a worker for the specified queue name', () => { makeActionWorker({ queueName: '{test-app-queue}', - queueConfig: { isQueuePausable: false }, + queueConfig: { isQueueDelayable: false }, }) expect(mocks.workerConstructor).toHaveBeenCalledWith( '{test-app-queue}', @@ -104,7 +104,7 @@ describe('Queue helper functions', () => { makeActionWorker({ queueName: 'some-queue', redisConnectionPrefix: '{test}', - queueConfig: { isQueuePausable: false }, + queueConfig: { isQueueDelayable: false }, }) expect(mocks.workerConstructor).toHaveBeenCalledWith( 'some-queue', @@ -119,7 +119,7 @@ describe('Queue helper functions', () => { { appQueueConfig: { getGroupConfigForJob: vi.fn(), - isQueuePausable: false, + isQueueDelayable: false, groupLimits: { type: 'concurrency' as const, concurrency: 2, @@ -134,7 +134,7 @@ describe('Queue helper functions', () => { { appQueueConfig: { getGroupConfigForJob: vi.fn(), - isQueuePausable: true, + isQueueDelayable: true, groupLimits: { type: 'rate-limit' as const, limit: { @@ -155,7 +155,7 @@ describe('Queue helper functions', () => { { appQueueConfig: { getGroupConfigForJob: vi.fn(), - isQueuePausable: true, + isQueueDelayable: true, groupLimits: { type: 'concurrency' as const, concurrency: 2, @@ -177,7 +177,7 @@ describe('Queue helper functions', () => { }, { appQueueConfig: { - isQueuePausable: false, + isQueueDelayable: false, queueRateLimit: { max: 1, duration: 5000, diff --git a/packages/backend/src/helpers/actions/handle-failed-step-and-throw.ts b/packages/backend/src/helpers/actions/handle-failed-step-and-throw.ts index 4941bd1b3..2c43f1f20 100644 --- a/packages/backend/src/helpers/actions/handle-failed-step-and-throw.ts +++ b/packages/backend/src/helpers/actions/handle-failed-step-and-throw.ts @@ -17,11 +17,11 @@ function handleRetriableError( context: HandleFailedStepAndThrowParams['context'], ): never { const { delayType, delayInMs } = executionError - const { worker, job, isQueuePausable } = context + const { worker, job, isQueueDelayable } = context switch (delayType) { case 'queue': - if (isQueuePausable) { + if (isQueueDelayable) { worker.rateLimit(delayInMs) throw WorkerPro.RateLimitError() } @@ -139,7 +139,7 @@ interface HandleFailedStepAndThrowParams { executionError: unknown context: { - isQueuePausable: boolean + isQueueDelayable: boolean span: Span worker: WorkerPro job: JobPro diff --git a/packages/backend/src/helpers/queues/make-action-worker.ts b/packages/backend/src/helpers/queues/make-action-worker.ts index b652beff8..fa79bf102 100644 --- a/packages/backend/src/helpers/queues/make-action-worker.ts +++ b/packages/backend/src/helpers/queues/make-action-worker.ts @@ -31,7 +31,7 @@ function convertParamsToBullMqOptions( params: MakeActionWorkerParams, ) /* inferred type */ { const { queueName, redisConnectionPrefix, queueConfig } = params - const { isQueuePausable, queueRateLimit } = queueConfig + const { isQueueDelayable, queueRateLimit } = queueConfig const workerOptions: WorkerProOptions = { connection: createRedisClient(), @@ -69,7 +69,7 @@ function convertParamsToBullMqOptions( return { queueName, workerOptions, - isQueuePausable, + isQueueDelayable, } } @@ -88,7 +88,7 @@ interface MakeActionWorkerParams { export function makeActionWorker( params: MakeActionWorkerParams, ): WorkerPro { - const { queueName, workerOptions, isQueuePausable } = + const { queueName, workerOptions, isQueueDelayable } = convertParamsToBullMqOptions(params) const worker: WorkerPro = new WorkerPro( queueName, @@ -138,7 +138,7 @@ export function makeActionWorker( errorDetails: executionStep.errorDetails, executionError, context: { - isQueuePausable, + isQueueDelayable, worker, span, job, diff --git a/packages/backend/src/workers/__tests__/action.test.ts b/packages/backend/src/workers/__tests__/action.test.ts index 87b704f9c..2ecb503fc 100644 --- a/packages/backend/src/workers/__tests__/action.test.ts +++ b/packages/backend/src/workers/__tests__/action.test.ts @@ -42,12 +42,12 @@ describe('action workers', () => { vi.restoreAllMocks() }) - it('creates the worker for the main action queue and makes it unpausable', () => { + it('creates the worker for the main action queue and makes it undelayable', () => { expect(mocks.makeActionWorker).toHaveBeenCalledWith({ queueName: MAIN_ACTION_QUEUE_NAME, redisConnectionPrefix: MAIN_ACTION_QUEUE_REDIS_CONNECTION_PREFIX, queueConfig: { - isQueuePausable: false, + isQueueDelayable: false, }, }) }) diff --git a/packages/backend/src/workers/action.ts b/packages/backend/src/workers/action.ts index 566f9ae24..6f56eab87 100644 --- a/packages/backend/src/workers/action.ts +++ b/packages/backend/src/workers/action.ts @@ -17,7 +17,7 @@ export const mainActionWorker = makeActionWorker({ queueName: MAIN_ACTION_QUEUE_NAME, redisConnectionPrefix: MAIN_ACTION_QUEUE_REDIS_CONNECTION_PREFIX, queueConfig: { - isQueuePausable: false, + isQueueDelayable: false, }, }) diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 6f66a8291..81e021e2d 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -371,13 +371,21 @@ export interface IAppQueue { queueRateLimit?: WorkerProOptions['limiter'] /** - * Configures if we are allowed to pause or rate limit the queue. + * Configures if we are allowed to delay or rate limit the entire queue. * * Concretely speaking, if this is true, RetriableErrors with delayType set * to `queue` will pause the entire queue for the delay period via a call to * `worker.rateLimit()`. + * + * This is a safety mechanism to ensure that we don't accidentally delay + * queues if reused code / helper functions throw RetriableErrors with + * `delayType` == `queue`. + * + * Note that BullMQ allows delaying the entire queue only if a queue rate + * limit is set. Thus, if you configure this to true, you must also configure + * the queueRateLimit. */ - isQueuePausable: boolean + isQueueDelayable: boolean } export interface IApp {