Skip to content

Commit

Permalink
PLU-224: [HOTFIX v2] Enable Excel 429 retries (#531)
Browse files Browse the repository at this point in the history
  • Loading branch information
ogp-weeloong authored Apr 18, 2024
1 parent 7ed4f44 commit d19a44e
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ describe('M365 interceptors', () => {
)
})

it('logs an error and throws a non-retriable error on 429', async () => {
it('logs an error and throws a non-retriable error on 429 from non-excel endpoint', async () => {
mockAxiosAdapterToThrowOnce(429, { 'retry-after': 123 })
await http
.get('/test-url')
Expand All @@ -167,6 +167,20 @@ describe('M365 interceptors', () => {
)
})

it('throws a retriable error on 429 from Excel endpoint', async () => {
mockAxiosAdapterToThrowOnce(429, { 'retry-after': 123 })
await http
.get("/test-url/workbook/cell(address='A1')")
.then(() => {
expect.unreachable()
})
.catch((error): void => {
expect(error).toBeInstanceOf(RetriableError)
expect(error.delayInMs).toEqual(123000)
expect(error.message).toEqual('Retrying HTTP 429 from Excel endpoint')
})
})

it('logs an error and throws a non-retriable error on 509', async () => {
mockAxiosAdapterToThrowOnce(509)
await http
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { IGlobalVariable, IJSONObject, IRawAction } from '@plumber/types'

import StepError from '@/errors/step'

import { throttleStepsForPublishedPipes } from '../../common/rate-limiter'
import { constructMsGraphValuesArrayForRowWrite } from '../../common/workbook-helpers/tables'
import WorkbookSession from '../../common/workbook-session'

Expand Down Expand Up @@ -134,6 +135,9 @@ const action: IRawAction = {
return
}

// FIXME (ogp-weeloong): remove when bullMQ Pro lands
await throttleStepsForPublishedPipes($, fileId as string)

// Sanity check user's config.
const seenColumnNames = new Set<string>()
for (const val of columnValues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { IRawAction } from '@plumber/types'

import StepError from '@/errors/step'

import { throttleStepsForPublishedPipes } from '../../common/rate-limiter'
import WorkbookSession from '../../common/workbook-session'

import type { DataOut } from './data-out'
Expand Down Expand Up @@ -77,6 +78,10 @@ const action: IRawAction = {

async run($) {
const { fileId, worksheetId, cells: rawCells } = $.step.parameters

// FIXME (ogp-weeloong): remove when bullMQ Pro lands
await throttleStepsForPublishedPipes($, fileId as string)

const cells = (rawCells as Array<{ address: string }>).map((cell) => ({
address: cell.address.trim(),
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import z from 'zod'

import StepError from '@/errors/step'

import { throttleStepsForPublishedPipes } from '../../common/rate-limiter'
import { convertRowToHexEncodedRowRecord } from '../../common/workbook-helpers/tables'
import WorkbookSession from '../../common/workbook-session'

Expand Down Expand Up @@ -122,6 +123,9 @@ const action: IRawAction = {
const { fileId, tableId, lookupColumn, lookupValue } =
parametersParseResult.data

// FIXME (ogp-weeloong): remove when bullMQ Pro lands
await throttleStepsForPublishedPipes($, fileId as string)

const session = await WorkbookSession.acquire($, fileId)
const results = await getTableRowImpl({
$,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import z from 'zod'

import StepError from '@/errors/step'

import { throttleStepsForPublishedPipes } from '../../common/rate-limiter'
import {
constructMsGraphValuesArrayForRowWrite,
convertRowToHexEncodedRowRecord,
Expand Down Expand Up @@ -90,6 +91,9 @@ const action: IRawAction = {
const { fileId, tableId, lookupColumn, lookupValue, columnsToUpdate } =
parametersParseResult.data

// FIXME (ogp-weeloong): remove when bullMQ Pro lands
await throttleStepsForPublishedPipes($, fileId as string)

//
// Find index of row to update
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { IRawAction } from '@plumber/types'

import StepError from '@/errors/step'

import { throttleStepsForPublishedPipes } from '../../common/rate-limiter'
import WorkbookSession from '../../common/workbook-session'

import { parametersSchema } from './parameters-schema'
Expand Down Expand Up @@ -94,6 +95,10 @@ const action: IRawAction = {
}

const { fileId, worksheetId, cells } = parametersParseResult.data

// FIXME (ogp-weeloong): remove when bullMQ Pro lands
await throttleStepsForPublishedPipes($, fileId as string)

const session = await WorkbookSession.acquire($, fileId)

await Promise.all(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ import logger from '@/helpers/logger'

import { MS_GRAPH_OAUTH_BASE_URL } from '../constants'
import { getAccessToken } from '../oauth/token-cache'
import {
consumeOrThrowLimiterWithLongestDelay,
throttleSpikesForPublishedPipes,
} from '../rate-limiter'
import { consumeOrThrowLimiterWithLongestDelay } from '../rate-limiter'

// This explicitly overcounts - e.g we will log if the request times out, even
// we can't confirm that it reached Microsoft. The intent is to assume the worst
Expand Down Expand Up @@ -66,9 +63,6 @@ const rateLimitCheck: TBeforeRequest = async function ($, requestConfig) {
}

try {
// FIXME (ogp-weeloong): throttle spiky published pipes only.
await throttleSpikesForPublishedPipes($, tenantKey)

await consumeOrThrowLimiterWithLongestDelay($, tenantKey, 1)
} catch (error) {
if (!(error instanceof RateLimiterRes)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@ type ThrowingHandler = (
// Handle MS rate limiting us
//
const handle429: ThrowingHandler = ($, error) => {
// Edge case: Thus far, _only_ 429s from the Excel endpoint are retriable
// because Microsoft applies dynamic rate limits for Excel, and we've verified
// with GovTech that these 429s have no impact on other M365 users.
//
// https://learn.microsoft.com/en-us/graph/workbook-best-practice?tabs=http#reduce-throttling-errors
//
// Excel endpoints are uniquely identified by the `/workbook/` url segment.
//
// FIXME (ogp-weeloong): eval if we can remove this and just retry _all_ 429s
// once we get bullmq pro in.
if (error.response.config.url.includes('/workbook/')) {
const retryAfterMs = Number(error.response?.headers?.['retry-after']) * 1000
throw new RetriableError({
error: 'Retrying HTTP 429 from Excel endpoint',
delayInMs: isNaN(retryAfterMs) ? 'default' : retryAfterMs,
})
}

// A 429 response is considered a SEV-2+ incident for some tenants; log it
// explicitly so that we can easily trigger incident creation from DD.
logger.error('Received HTTP 429 from MS Graph', {
Expand Down
49 changes: 33 additions & 16 deletions packages/backend/src/apps/m365-excel/common/rate-limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {

import { M365TenantKey } from '@/config/app-env-vars/m365'
import { createRedisClient, REDIS_DB_INDEX } from '@/config/redis'
import RetriableError from '@/errors/retriable-error'
import logger from '@/helpers/logger'

// Based on agreement with Govtech team. For simplicity, we'll apply the same
Expand Down Expand Up @@ -64,30 +65,46 @@ const excelLimiter = new RateLimiterRedis({

// FIXME (ogp-weeloong): it turns out MS Graph cannot tolerate 10 QPS spikes
// and will reply with HTTP 429 if we do that (even though it's technically
// within rate limits). This is a workaround to stop spikes until we get
// BullMQ pro in.
const spikePreventer = new RateLimiterRedis({
// Numbers obtained via trial and error from user reports, plus some
// reasoning: we make ~2 queries per excel step, and we want to allow 3 steps
// to progress each time window,
points: 6,
duration: 3,
keyPrefix: 'm365-spike-preventer',
// within rate limits). This is a workaround to stop these spikes until we get
// BullMQ pro in, by choking ourselves to at most 1 step per 3 seconds per
// file (hypothesis is that Excel can't handle bursts to the same file)
//
// 3 seconds was chosen as that was the P90 of excel API calls over past 2
// weeks.
//
// Note that we don't throttle test runs to enable users to test pipes with more
// than 1 excel step. For published pipes, it's not an issue because of
// auto-retry.
const P90_EXCEL_API_RTT_SECONDS = 3
const perFileStepLimiter = new RateLimiterRedis({
points: 1,
duration: P90_EXCEL_API_RTT_SECONDS,
keyPrefix: 'm365-per-file-step-limiter',
storeClient: redisClient,
})

// FIXME (ogp-weeloong): we don't throttle test runs because this limit is too
// low; at 6 queries per 3 seconds, users can't test pipes with more than 1
// excel step. For publisehd pipes, it's not an issue because of auto-retry.
export async function throttleSpikesForPublishedPipes(
export async function throttleStepsForPublishedPipes(
$: IGlobalVariable,
tenantKey: M365TenantKey,
fileId: string,
): Promise<void> {
if ($.execution?.testRun) {
return
}

await spikePreventer.consume(tenantKey, 1)
try {
await perFileStepLimiter.consume(fileId, 1)
} catch (error) {
if (!(error instanceof RateLimiterRes)) {
throw error
}

throw new RetriableError({
error: 'Reached M365 step limit',
// If we're rate limited, we're probably facing a spike of steps for that
// file, so spread out retries over a wider time period (2x) to reduce the
// size of the retry thundering herd at any point in time.
delayInMs: P90_EXCEL_API_RTT_SECONDS * 1000 * 2,
})
}
}

const unifiedRateLimiter = new RateLimiterUnion(
Expand Down
16 changes: 9 additions & 7 deletions packages/backend/src/helpers/default-job-configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ export const DEFAULT_JOB_DELAY_DURATION = 0
// M465. Revert back to 6 once BullMQ Pro is in.
//
// Number chosen as follows:
// - Excel rate limit is 6 per 3 seconds
// - Excel pipes tend to spike at ~100 submissions per 3 second window
// - Each excel step takes 2 queries = ~3 excel steps progresses per window,
// others get retried. Under high concurrency, we may half this number as all
// steps share the same limiter.
// - So in the worst case, a step may need to be retried 100 / 1.5 ~= 60 times.
export const MAXIMUM_JOB_ATTEMPTS = 60
// - Excel step limit is 1 per 3 seconds per file, with exponential backoff of
// 6 seconds.
// - Excel pipes tend to spike at ~100 submissions (i.e. ~100 steps) in a short
// instance, but never occur again for that day.
// - Due to exponential backoff, between each retry, we can expect 2^attempts
// steps to make progress. So in the worst case, a step might be retried
// log2(100) times ~= 7.
// - We round that up to 10 just in case.
export const MAXIMUM_JOB_ATTEMPTS = 10

export const DEFAULT_JOB_OPTIONS: JobsOptions = {
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
Expand Down

0 comments on commit d19a44e

Please sign in to comment.