Skip to content

Commit

Permalink
PLU-125: [EXCEL-9] Pre-request checks (#373)
Browse files Browse the repository at this point in the history
  • Loading branch information
ogp-weeloong authored Jan 4, 2024
1 parent 48b227e commit a994699
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
import type { TBeforeRequest } from '@plumber/types'

import { RateLimiterRes } from 'rate-limiter-flexible'

import { isM365TenantKey } from '@/config/app-env-vars/m365'
import RetriableError from '@/errors/retriable-error'
import logger from '@/helpers/logger'

import { MS_GRAPH_OAUTH_BASE_URL } from '../constants'
import { getAccessToken } from '../oauth/token-cache'
import { consumeOrThrowLimiterWithLongestDelay } from '../rate-limiter'

// This explicitly overcounts - e.g we will log if the request times out, even
// we can't confirm that it reached Microsoft. The intent is to assume the worst
// case scenario and not miss cases such as:
// 1. We sent a request and it reached Microsoft.
// 2. Microsoft responds; response is routed by various routers on the net.
// 3. One of the routers in the response trip crashes and we get a timeout.
const usageTracker: TBeforeRequest = async function ($, requestConfig) {
logger.info('Making request to MS Graph', {
event: 'm365-ms-graph-request',
baseUrl: requestConfig.baseURL, // base URL is different for auth requests
urlPath: requestConfig.url,
flowId: $.flow?.id,
stepId: $.step?.id,
executionId: $.execution?.id,
})

// TODO in later PR: usage tracker
return requestConfig
}

const addAuthToken: TBeforeRequest = async function ($, requestConfig) {
// Don't add token if we're trying to request a token.
Expand All @@ -27,8 +51,32 @@ const addAuthToken: TBeforeRequest = async function ($, requestConfig) {
return requestConfig
}

// TODO in later PR: rate limiting request interceptor (this is temporary
// until we implement the plumber-wide rate limiting system; building this
// first to unblock M365 pilot).
// This rate limiting request interceptor is slightly different from the planned
// plumber-wide rate limiting system; we need to limit per request instead of
// per action (that's also why this is placed in beforeRequest instead of in the
// `run` function; an action may need multiple requests).
const rateLimitCheck: TBeforeRequest = async function ($, requestConfig) {
const tenantKey = $.auth.data?.tenantKey as string
if (!isM365TenantKey(tenantKey)) {
throw new Error(`'${tenantKey}' is not a valid M365 tenant.`)
}

try {
await consumeOrThrowLimiterWithLongestDelay($, tenantKey, 1)
} catch (error) {
if (!(error instanceof RateLimiterRes)) {
return
}

throw new RetriableError({
error: 'Reached M365 rate limit',
delayInMs: error.msBeforeNext,
})
}

return requestConfig
}

export default [addAuthToken]
// rateLimitCheck are explicitly the earliest interceptors so that the others
// are not called if it throws an error.
export default [rateLimitCheck, usageTracker, addAuthToken]
140 changes: 140 additions & 0 deletions packages/backend/src/apps/m365-excel/common/rate-limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import type { IGlobalVariable } from '@plumber/types'

import {
type IRateLimiterRedisOptions,
RateLimiterRedis,
RateLimiterRes,
RateLimiterUnion,
} from 'rate-limiter-flexible'

import { M365TenantKey } from '@/config/app-env-vars/m365'
import { createRedisClient, REDIS_DB_INDEX } from '@/config/redis'
import logger from '@/helpers/logger'

// Based on agreement with Govtech team. For simplicity, we'll apply the same
// rate limits to all other tenants.
const M365_RATE_LIMITS = Object.freeze({
graphApi: {
points: 13000,
durationSeconds: 10,
},
sharePointPerMinute: {
points: 300,
durationSeconds: 60,
},
sharePointPerDay: {
points: 300000,
durationSeconds: 60 * 60 * 24,
},
excel: {
points: 150,
durationSeconds: 10,
},
})

const redisClient = createRedisClient(REDIS_DB_INDEX.RATE_LIMIT)

const graphApiLimiter = new RateLimiterRedis({
points: M365_RATE_LIMITS.graphApi.points,
duration: M365_RATE_LIMITS.graphApi.durationSeconds,
keyPrefix: 'm365-graph',
storeClient: redisClient,
})

const sharePointPerMinuteLimiter = new RateLimiterRedis({
points: M365_RATE_LIMITS.sharePointPerMinute.points,
duration: M365_RATE_LIMITS.sharePointPerMinute.durationSeconds,
keyPrefix: 'm365-sharepoint-per-min',
storeClient: redisClient,
})

const sharePointPerDayLimiter = new RateLimiterRedis({
points: M365_RATE_LIMITS.sharePointPerDay.points,
duration: M365_RATE_LIMITS.sharePointPerDay.durationSeconds,
keyPrefix: 'm365-sharepoint-per-min',
storeClient: redisClient,
})

const excelLimiter = new RateLimiterRedis({
points: M365_RATE_LIMITS.excel.points,
duration: M365_RATE_LIMITS.excel.durationSeconds,
keyPrefix: 'm365-excel',
storeClient: redisClient,
})

const unifiedRateLimiter = new RateLimiterUnion(
graphApiLimiter,
sharePointPerMinuteLimiter,
sharePointPerDayLimiter,
excelLimiter,
)

type UnionRateLimiterRes = Record<
IRateLimiterRedisOptions['keyPrefix'],
RateLimiterRes
>

function isUnionRateLimiterRes(err: unknown): err is UnionRateLimiterRes {
if (!err || typeof err !== 'object' || Object.keys(err).length === 0) {
return false
}

for (const val of Object.values(err)) {
if (!(val instanceof RateLimiterRes)) {
return false
}
}

return true
}

/**
* We expose this instead of exposing the underlying RateLimiterUnion because
* union has a _very_ non-standard interface that's footgunny: on rate limit,
* it throws an {[keyPrefix: string]: RateLimiterRes} object instead of
* RateLimiterRes itself.
*
* While it's understandable why rate-limiter-flexiable did this, this edge case
* makes standard code like `err instanceof RateLimiterRes` unexpectedly not
* work, which is bad(tm).
*
* To mitigate this, we expose a function which returns the RateLimiterRes with
* the longest delay, which is usually what callers want anyway.
*/
export async function consumeOrThrowLimiterWithLongestDelay(
$: IGlobalVariable,
tenantKey: M365TenantKey,
points: number,
): Promise<void> {
try {
await unifiedRateLimiter.consume(tenantKey, points)
} catch (error) {
if (!isUnionRateLimiterRes(error)) {
throw error
}

// Note: guaranteed errorKeys at least length 1 due to
// isUnionRateLimiterRes check.
const errorKeys = Object.keys(error)

logger.warn('Reached internal M365 rate limit', {
event: 'm365-internally-rate-limited',
flowId: $.flow?.id,
stepId: $.step?.id,
executionId: $.execution?.id,
tenantKey,
rateLimitedKeyPrefixes: errorKeys.join(', '),
})

// Find and throw the RateLimiterRes with the longest delay.
let bestError = error[errorKeys[0]]
for (let i = 1; i < errorKeys.length; ++i) {
const currError = error[errorKeys[i]]
if (currError.msBeforeNext > bestError.msBeforeNext) {
bestError = currError
}
}

throw bestError
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { tryParseGraphApiError } from '../parse-graph-api-error'
import {
clearSessionIdFromRedis,
getSessionIdFromRedis,
runWithLock,
runWithLockElseRetryStep,
setSessionIdInRedis,
} from './redis'

Expand All @@ -23,7 +23,7 @@ async function invalidateSessionId(
fileId: string,
badSessionId: string,
): Promise<void> {
await runWithLock(tenant, fileId, async (signal) => {
await runWithLockElseRetryStep(tenant, fileId, async (signal) => {
// Nothing to do if another worker has already switched our fleet to a
// different session.
const sessionIdInRedis = await getSessionIdFromRedis(tenant, fileId)
Expand All @@ -44,7 +44,7 @@ async function refreshSessionId(
fileId: string,
$: IGlobalVariable,
): Promise<string> {
return await runWithLock(tenant, fileId, async (signal) => {
return await runWithLockElseRetryStep(tenant, fileId, async (signal) => {
// It's possible for multiple workers - or even multiple calls by the same
// worker - to await this. When this happens, the 1st caller will have
// refreshed the session and stored its ID in redis, so subsequent callers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Redlock from 'redlock'
import Redlock, { ResourceLockedError } from 'redlock'

import { type M365TenantInfo } from '@/config/app-env-vars/m365'
import RetriableError from '@/errors/retriable-error'
import {
makeRedisAppDataKey,
redisAppDataClient,
Expand Down Expand Up @@ -49,18 +50,36 @@ const redlock = new Redlock([redisAppDataClient], {
// before continuing.
const DEAD_LOCK_HOLDER_PREVENTION_TIMEOUT_MS = 150 * 1000

export async function runWithLock<T>(
// This is the delay to retry if a worker is unable to acquire a lock (e.g. due
// to some other worker acquiring it and starting a new workbook session).
//
// I chose 5 seconds as that seems to be a reasonable-ish value where p90 (from
// personal experience...) of Graph API requests will complete.
const LOCK_FAILURE_RETRY_DELAY_MS = 5 * 1000

export async function runWithLockElseRetryStep<T>(
tenant: M365TenantInfo,
fileId: string,
callback: Parameters<typeof redlock.using<T>>[2],
): Promise<T> {
const lockKey = `${makeRedisKeyPrefix(tenant, fileId)}session:lock`

return await redlock.using(
[lockKey],
DEAD_LOCK_HOLDER_PREVENTION_TIMEOUT_MS,
callback,
)
try {
return await redlock.using(
[lockKey],
DEAD_LOCK_HOLDER_PREVENTION_TIMEOUT_MS,
callback,
)
} catch (error) {
if (!(error instanceof ResourceLockedError)) {
throw error
}

throw new RetriableError({
error: 'Unable to acquire excel session lock.',
delayInMs: LOCK_FAILURE_RETRY_DELAY_MS,
})
}
}

//
Expand Down
8 changes: 7 additions & 1 deletion packages/backend/src/errors/retriable-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ interface RetriableErrorParams {
}

/**
* Throw this in a worker action body to get BullMQ to retry.
* When thrown, this error indicates that we should requeue the action and for
* retrying. This is passed directly to our BullMQ's backoff strategy callback.
*
* Typically used in these 2 places:
* 1. Explicitly thrown by action code in response to some app-specific event
* (e.g. rate limited by M365).
* 2. Default action error handler (`handleErrorAndThrow`).
*/
export default class RetriableError extends BaseError {
delayInMs: RetriableErrorParams['delayInMs']
Expand Down
11 changes: 9 additions & 2 deletions packages/backend/src/helpers/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,18 @@ export function handleErrorAndThrow(
// This is thrown from app.run, which _in theory_ can be anything.
executionError: unknown,
): never {
// Only support retrying HTTP errors for now.
// Edge case as some actions throw StepError now, but others don't.
if (executionError instanceof StepError) {
executionError = executionError.cause
}
if (!executionError || !(executionError instanceof HttpError)) {

// We passthrough RetriableErrors thrown directly by the action.
if (executionError instanceof RetriableError) {
throw executionError
}

// Otherwise... we only support automatically retrying HTTP errors for now.
if (!(executionError instanceof HttpError)) {
throw new UnrecoverableError(JSON.stringify(errorDetails))
}

Expand Down
15 changes: 15 additions & 0 deletions packages/backend/src/helpers/http-client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { IHttpClientParams } from '@plumber/types'
import { URL } from 'url'

import HttpError from '@/errors/http'
import RetriableError from '@/errors/retriable-error'

const removeBaseUrlForAbsoluteUrls = (
requestConfig: InternalAxiosRequestConfig,
Expand Down Expand Up @@ -48,6 +49,20 @@ export default function createHttpClient({
instance.interceptors.response.use(
(response) => response,
async (error) => {
// EDGE CASE: We allow actions / triggers to throw RetriableError, and
// some of them may choose to throw from beforeRequest. If this happens,
// Axios will still process the error through this response interceptor.
//
// Since RetriableError is supposed to be passed directly through to
// BullMQ, we special case it here instead of converting it to HttpError.
//
// Note that we still want to convert _other_ error types (even if
// they're not axios errors!) to HttpError; this helps prevent accidental
// sensitive data leakage via error objects in our logs.
if (error instanceof RetriableError) {
throw error
}

// This handles system errors like ECONNREFUSED, which don't have a
// response body.
if (!error.response) {
Expand Down

0 comments on commit a994699

Please sign in to comment.