Skip to content

Commit

Permalink
chore: rename isQueuePausable
Browse files Browse the repository at this point in the history
  • Loading branch information
ogp-weeloong committed May 7, 2024
1 parent 96a0a4b commit 929b5e3
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 26 deletions.
4 changes: 2 additions & 2 deletions packages/backend/src/errors/retriable-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ interface RetriableErrorParams {
*
* The `delayType` param allows for configuring how we should delay before
* retrying:
* - `queue`: The step is re-queued, and the ENTIRE queue is paused until the
* - `queue`: The step is re-queued, and the ENTIRE queue is delayed until the
* delay period is over. This is generally used in per-app queues
* (e.g. the app requires us to pause all API calls if they return a
* 429). This will not work if the step came from the default action
* queue.
* - `group`: The step is re-queued, and the group associated with the step is
* paused until the delay period is over. Generally used in per-app
* delayed until the delay period is over. Generally used in per-app
* queues (e.g. the app rate limits by connection ID, and the group
* ID is set to the connection ID).
* - `step`: The step is re-queued with delay set to the delay period. The other
Expand Down
12 changes: 6 additions & 6 deletions packages/backend/src/helpers/__tests__/actions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const mocks = vi.hoisted(() => ({
}))

const MOCK_CONTEXT = {
isQueuePausable: false,
isQueueDelayable: false,
span: {
addTags: mocks.addSpanTags,
} as unknown as Span,
Expand Down Expand Up @@ -240,7 +240,7 @@ describe('action helper functions', () => {
})

describe('RetriableError handling', () => {
it('pauses the queue if delayType is queue and the queue is pausable', () => {
it('delays the queue if delayType is queue and the queue is delayable', () => {
try {
handleFailedStepAndThrow({
errorDetails: {},
Expand All @@ -251,7 +251,7 @@ describe('action helper functions', () => {
}),
context: {
...MOCK_CONTEXT,
isQueuePausable: true,
isQueueDelayable: true,
},
})
} catch (e) {
Expand All @@ -263,7 +263,7 @@ describe('action helper functions', () => {
}
})

it('does not pause the queue if delayType is queue but queue is not pausable', () => {
it('does not delay the queue if delayType is queue but queue is not delayable', () => {
expect(() =>
handleFailedStepAndThrow({
errorDetails: {},
Expand All @@ -280,7 +280,7 @@ describe('action helper functions', () => {
expect(mocks.workerRateLimitGroup).not.toBeCalled()
})

it("pauses the job's group if delayType is group", () => {
it("delays the job's group if delayType is group", () => {
const job = {
opts: {
group: {
Expand Down Expand Up @@ -311,7 +311,7 @@ describe('action helper functions', () => {
}
})

it('does not pause the group if delayType is group but job does not have a group', () => {
it('does not delay the group if delayType is group but job does not have a group', () => {
expect(() =>
handleFailedStepAndThrow({
errorDetails: {},
Expand Down
12 changes: 6 additions & 6 deletions packages/backend/src/helpers/__tests__/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ describe('Queue helper functions', () => {
it('creates a worker for the specified queue name', () => {
makeActionWorker({
queueName: '{test-app-queue}',
queueConfig: { isQueuePausable: false },
queueConfig: { isQueueDelayable: false },
})
expect(mocks.workerConstructor).toHaveBeenCalledWith(
'{test-app-queue}',
Expand All @@ -104,7 +104,7 @@ describe('Queue helper functions', () => {
makeActionWorker({
queueName: 'some-queue',
redisConnectionPrefix: '{test}',
queueConfig: { isQueuePausable: false },
queueConfig: { isQueueDelayable: false },
})
expect(mocks.workerConstructor).toHaveBeenCalledWith(
'some-queue',
Expand All @@ -119,7 +119,7 @@ describe('Queue helper functions', () => {
{
appQueueConfig: {
getGroupConfigForJob: vi.fn(),
isQueuePausable: false,
isQueueDelayable: false,
groupLimits: {
type: 'concurrency' as const,
concurrency: 2,
Expand All @@ -134,7 +134,7 @@ describe('Queue helper functions', () => {
{
appQueueConfig: {
getGroupConfigForJob: vi.fn(),
isQueuePausable: true,
isQueueDelayable: true,
groupLimits: {
type: 'rate-limit' as const,
limit: {
Expand All @@ -155,7 +155,7 @@ describe('Queue helper functions', () => {
{
appQueueConfig: {
getGroupConfigForJob: vi.fn(),
isQueuePausable: true,
isQueueDelayable: true,
groupLimits: {
type: 'concurrency' as const,
concurrency: 2,
Expand All @@ -177,7 +177,7 @@ describe('Queue helper functions', () => {
},
{
appQueueConfig: {
isQueuePausable: false,
isQueueDelayable: false,
queueRateLimit: {
max: 1,
duration: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ function handleRetriableError(
context: HandleFailedStepAndThrowParams['context'],
): never {
const { delayType, delayInMs } = executionError
const { worker, job, isQueuePausable } = context
const { worker, job, isQueueDelayable } = context

switch (delayType) {
case 'queue':
if (isQueuePausable) {
if (isQueueDelayable) {
worker.rateLimit(delayInMs)
throw WorkerPro.RateLimitError()
}
Expand Down Expand Up @@ -139,7 +139,7 @@ interface HandleFailedStepAndThrowParams {
executionError: unknown

context: {
isQueuePausable: boolean
isQueueDelayable: boolean
span: Span
worker: WorkerPro<IActionJobData>
job: JobPro<IActionJobData>
Expand Down
8 changes: 4 additions & 4 deletions packages/backend/src/helpers/queues/make-action-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function convertParamsToBullMqOptions(
params: MakeActionWorkerParams,
) /* inferred type */ {
const { queueName, redisConnectionPrefix, queueConfig } = params
const { isQueuePausable, queueRateLimit } = queueConfig
const { isQueueDelayable, queueRateLimit } = queueConfig

const workerOptions: WorkerProOptions = {
connection: createRedisClient(),
Expand Down Expand Up @@ -69,7 +69,7 @@ function convertParamsToBullMqOptions(
return {
queueName,
workerOptions,
isQueuePausable,
isQueueDelayable,
}
}

Expand All @@ -88,7 +88,7 @@ interface MakeActionWorkerParams {
export function makeActionWorker(
params: MakeActionWorkerParams,
): WorkerPro<IActionJobData> {
const { queueName, workerOptions, isQueuePausable } =
const { queueName, workerOptions, isQueueDelayable } =
convertParamsToBullMqOptions(params)
const worker: WorkerPro<IActionJobData> = new WorkerPro<IActionJobData>(
queueName,
Expand Down Expand Up @@ -138,7 +138,7 @@ export function makeActionWorker(
errorDetails: executionStep.errorDetails,
executionError,
context: {
isQueuePausable,
isQueueDelayable,
worker,
span,
job,
Expand Down
4 changes: 2 additions & 2 deletions packages/backend/src/workers/__tests__/action.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ describe('action workers', () => {
vi.restoreAllMocks()
})

it('creates the worker for the main action queue and makes it unpausable', () => {
it('creates the worker for the main action queue and makes it undelayable', () => {
expect(mocks.makeActionWorker).toHaveBeenCalledWith({
queueName: MAIN_ACTION_QUEUE_NAME,
redisConnectionPrefix: MAIN_ACTION_QUEUE_REDIS_CONNECTION_PREFIX,
queueConfig: {
isQueuePausable: false,
isQueueDelayable: false,
},
})
})
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/workers/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const mainActionWorker = makeActionWorker({
queueName: MAIN_ACTION_QUEUE_NAME,
redisConnectionPrefix: MAIN_ACTION_QUEUE_REDIS_CONNECTION_PREFIX,
queueConfig: {
isQueuePausable: false,
isQueueDelayable: false,
},
})

Expand Down
12 changes: 10 additions & 2 deletions packages/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,21 @@ export interface IAppQueue {
queueRateLimit?: WorkerProOptions['limiter']

/**
* Configures if we are allowed to pause or rate limit the queue.
* Configures if we are allowed to delay or rate limit the entire queue.
*
* Concretely speaking, if this is true, RetriableErrors with delayType set
* to `queue` will pause the entire queue for the delay period via a call to
* `worker.rateLimit()`.
*
* This is a safety mechanism to ensure that we don't accidentally delay
* queues if reused code / helper functions throw RetriableErrors with
* `delayType` == `queue`.
*
* Note that BullMQ allows delaying the entire queue only if a queue rate
* limit is set. Thus, if you configure this to true, you must also configure
* the queueRateLimit.
*/
isQueuePausable: boolean
isQueueDelayable: boolean
}

export interface IApp {
Expand Down

0 comments on commit 929b5e3

Please sign in to comment.