From a99469973844f48de965ed017b6e3da7210fdc22 Mon Sep 17 00:00:00 2001 From: Kuan Wee Loong <135598754+ogp-weeloong@users.noreply.github.com> Date: Thu, 4 Jan 2024 18:07:01 +0800 Subject: [PATCH] PLU-125: [EXCEL-9] Pre-request checks (#373) --- .../common/interceptors/before-request.ts | 58 +++++++- .../apps/m365-excel/common/rate-limiter.ts | 140 ++++++++++++++++++ .../common/workbook-session/index.ts | 6 +- .../common/workbook-session/redis.ts | 33 ++++- .../backend/src/errors/retriable-error.ts | 8 +- packages/backend/src/helpers/actions.ts | 11 +- .../backend/src/helpers/http-client/index.ts | 15 ++ 7 files changed, 253 insertions(+), 18 deletions(-) create mode 100644 packages/backend/src/apps/m365-excel/common/rate-limiter.ts diff --git a/packages/backend/src/apps/m365-excel/common/interceptors/before-request.ts b/packages/backend/src/apps/m365-excel/common/interceptors/before-request.ts index 82c307f38..bcf6c34f9 100644 --- a/packages/backend/src/apps/m365-excel/common/interceptors/before-request.ts +++ b/packages/backend/src/apps/m365-excel/common/interceptors/before-request.ts @@ -1,9 +1,33 @@ import type { TBeforeRequest } from '@plumber/types' +import { RateLimiterRes } from 'rate-limiter-flexible' + +import { isM365TenantKey } from '@/config/app-env-vars/m365' +import RetriableError from '@/errors/retriable-error' +import logger from '@/helpers/logger' + import { MS_GRAPH_OAUTH_BASE_URL } from '../constants' import { getAccessToken } from '../oauth/token-cache' +import { consumeOrThrowLimiterWithLongestDelay } from '../rate-limiter' + +// This explicitly overcounts - e.g we will log if the request times out, even +// we can't confirm that it reached Microsoft. The intent is to assume the worst +// case scenario and not miss cases such as: +// 1. We sent a request and it reached Microsoft. +// 2. Microsoft responds; response is routed by various routers on the net. +// 3. One of the routers in the response trip crashes and we get a timeout. +const usageTracker: TBeforeRequest = async function ($, requestConfig) { + logger.info('Making request to MS Graph', { + event: 'm365-ms-graph-request', + baseUrl: requestConfig.baseURL, // base URL is different for auth requests + urlPath: requestConfig.url, + flowId: $.flow?.id, + stepId: $.step?.id, + executionId: $.execution?.id, + }) -// TODO in later PR: usage tracker + return requestConfig +} const addAuthToken: TBeforeRequest = async function ($, requestConfig) { // Don't add token if we're trying to request a token. @@ -27,8 +51,32 @@ const addAuthToken: TBeforeRequest = async function ($, requestConfig) { return requestConfig } -// TODO in later PR: rate limiting request interceptor (this is temporary -// until we implement the plumber-wide rate limiting system; building this -// first to unblock M365 pilot). +// This rate limiting request interceptor is slightly different from the planned +// plumber-wide rate limiting system; we need to limit per request instead of +// per action (that's also why this is placed in beforeRequest instead of in the +// `run` function; an action may need multiple requests). +const rateLimitCheck: TBeforeRequest = async function ($, requestConfig) { + const tenantKey = $.auth.data?.tenantKey as string + if (!isM365TenantKey(tenantKey)) { + throw new Error(`'${tenantKey}' is not a valid M365 tenant.`) + } + + try { + await consumeOrThrowLimiterWithLongestDelay($, tenantKey, 1) + } catch (error) { + if (!(error instanceof RateLimiterRes)) { + return + } + + throw new RetriableError({ + error: 'Reached M365 rate limit', + delayInMs: error.msBeforeNext, + }) + } + + return requestConfig +} -export default [addAuthToken] +// rateLimitCheck are explicitly the earliest interceptors so that the others +// are not called if it throws an error. +export default [rateLimitCheck, usageTracker, addAuthToken] diff --git a/packages/backend/src/apps/m365-excel/common/rate-limiter.ts b/packages/backend/src/apps/m365-excel/common/rate-limiter.ts new file mode 100644 index 000000000..4e3943a23 --- /dev/null +++ b/packages/backend/src/apps/m365-excel/common/rate-limiter.ts @@ -0,0 +1,140 @@ +import type { IGlobalVariable } from '@plumber/types' + +import { + type IRateLimiterRedisOptions, + RateLimiterRedis, + RateLimiterRes, + RateLimiterUnion, +} from 'rate-limiter-flexible' + +import { M365TenantKey } from '@/config/app-env-vars/m365' +import { createRedisClient, REDIS_DB_INDEX } from '@/config/redis' +import logger from '@/helpers/logger' + +// Based on agreement with Govtech team. For simplicity, we'll apply the same +// rate limits to all other tenants. +const M365_RATE_LIMITS = Object.freeze({ + graphApi: { + points: 13000, + durationSeconds: 10, + }, + sharePointPerMinute: { + points: 300, + durationSeconds: 60, + }, + sharePointPerDay: { + points: 300000, + durationSeconds: 60 * 60 * 24, + }, + excel: { + points: 150, + durationSeconds: 10, + }, +}) + +const redisClient = createRedisClient(REDIS_DB_INDEX.RATE_LIMIT) + +const graphApiLimiter = new RateLimiterRedis({ + points: M365_RATE_LIMITS.graphApi.points, + duration: M365_RATE_LIMITS.graphApi.durationSeconds, + keyPrefix: 'm365-graph', + storeClient: redisClient, +}) + +const sharePointPerMinuteLimiter = new RateLimiterRedis({ + points: M365_RATE_LIMITS.sharePointPerMinute.points, + duration: M365_RATE_LIMITS.sharePointPerMinute.durationSeconds, + keyPrefix: 'm365-sharepoint-per-min', + storeClient: redisClient, +}) + +const sharePointPerDayLimiter = new RateLimiterRedis({ + points: M365_RATE_LIMITS.sharePointPerDay.points, + duration: M365_RATE_LIMITS.sharePointPerDay.durationSeconds, + keyPrefix: 'm365-sharepoint-per-min', + storeClient: redisClient, +}) + +const excelLimiter = new RateLimiterRedis({ + points: M365_RATE_LIMITS.excel.points, + duration: M365_RATE_LIMITS.excel.durationSeconds, + keyPrefix: 'm365-excel', + storeClient: redisClient, +}) + +const unifiedRateLimiter = new RateLimiterUnion( + graphApiLimiter, + sharePointPerMinuteLimiter, + sharePointPerDayLimiter, + excelLimiter, +) + +type UnionRateLimiterRes = Record< + IRateLimiterRedisOptions['keyPrefix'], + RateLimiterRes +> + +function isUnionRateLimiterRes(err: unknown): err is UnionRateLimiterRes { + if (!err || typeof err !== 'object' || Object.keys(err).length === 0) { + return false + } + + for (const val of Object.values(err)) { + if (!(val instanceof RateLimiterRes)) { + return false + } + } + + return true +} + +/** + * We expose this instead of exposing the underlying RateLimiterUnion because + * union has a _very_ non-standard interface that's footgunny: on rate limit, + * it throws an {[keyPrefix: string]: RateLimiterRes} object instead of + * RateLimiterRes itself. + * + * While it's understandable why rate-limiter-flexiable did this, this edge case + * makes standard code like `err instanceof RateLimiterRes` unexpectedly not + * work, which is bad(tm). + * + * To mitigate this, we expose a function which returns the RateLimiterRes with + * the longest delay, which is usually what callers want anyway. + */ +export async function consumeOrThrowLimiterWithLongestDelay( + $: IGlobalVariable, + tenantKey: M365TenantKey, + points: number, +): Promise { + try { + await unifiedRateLimiter.consume(tenantKey, points) + } catch (error) { + if (!isUnionRateLimiterRes(error)) { + throw error + } + + // Note: guaranteed errorKeys at least length 1 due to + // isUnionRateLimiterRes check. + const errorKeys = Object.keys(error) + + logger.warn('Reached internal M365 rate limit', { + event: 'm365-internally-rate-limited', + flowId: $.flow?.id, + stepId: $.step?.id, + executionId: $.execution?.id, + tenantKey, + rateLimitedKeyPrefixes: errorKeys.join(', '), + }) + + // Find and throw the RateLimiterRes with the longest delay. + let bestError = error[errorKeys[0]] + for (let i = 1; i < errorKeys.length; ++i) { + const currError = error[errorKeys[i]] + if (currError.msBeforeNext > bestError.msBeforeNext) { + bestError = currError + } + } + + throw bestError + } +} diff --git a/packages/backend/src/apps/m365-excel/common/workbook-session/index.ts b/packages/backend/src/apps/m365-excel/common/workbook-session/index.ts index 15b2197b3..79aaf23af 100644 --- a/packages/backend/src/apps/m365-excel/common/workbook-session/index.ts +++ b/packages/backend/src/apps/m365-excel/common/workbook-session/index.ts @@ -14,7 +14,7 @@ import { tryParseGraphApiError } from '../parse-graph-api-error' import { clearSessionIdFromRedis, getSessionIdFromRedis, - runWithLock, + runWithLockElseRetryStep, setSessionIdInRedis, } from './redis' @@ -23,7 +23,7 @@ async function invalidateSessionId( fileId: string, badSessionId: string, ): Promise { - await runWithLock(tenant, fileId, async (signal) => { + await runWithLockElseRetryStep(tenant, fileId, async (signal) => { // Nothing to do if another worker has already switched our fleet to a // different session. const sessionIdInRedis = await getSessionIdFromRedis(tenant, fileId) @@ -44,7 +44,7 @@ async function refreshSessionId( fileId: string, $: IGlobalVariable, ): Promise { - return await runWithLock(tenant, fileId, async (signal) => { + return await runWithLockElseRetryStep(tenant, fileId, async (signal) => { // It's possible for multiple workers - or even multiple calls by the same // worker - to await this. When this happens, the 1st caller will have // refreshed the session and stored its ID in redis, so subsequent callers diff --git a/packages/backend/src/apps/m365-excel/common/workbook-session/redis.ts b/packages/backend/src/apps/m365-excel/common/workbook-session/redis.ts index af1d7d729..cdacd2c18 100644 --- a/packages/backend/src/apps/m365-excel/common/workbook-session/redis.ts +++ b/packages/backend/src/apps/m365-excel/common/workbook-session/redis.ts @@ -1,6 +1,7 @@ -import Redlock from 'redlock' +import Redlock, { ResourceLockedError } from 'redlock' import { type M365TenantInfo } from '@/config/app-env-vars/m365' +import RetriableError from '@/errors/retriable-error' import { makeRedisAppDataKey, redisAppDataClient, @@ -49,18 +50,36 @@ const redlock = new Redlock([redisAppDataClient], { // before continuing. const DEAD_LOCK_HOLDER_PREVENTION_TIMEOUT_MS = 150 * 1000 -export async function runWithLock( +// This is the delay to retry if a worker is unable to acquire a lock (e.g. due +// to some other worker acquiring it and starting a new workbook session). +// +// I chose 5 seconds as that seems to be a reasonable-ish value where p90 (from +// personal experience...) of Graph API requests will complete. +const LOCK_FAILURE_RETRY_DELAY_MS = 5 * 1000 + +export async function runWithLockElseRetryStep( tenant: M365TenantInfo, fileId: string, callback: Parameters>[2], ): Promise { const lockKey = `${makeRedisKeyPrefix(tenant, fileId)}session:lock` - return await redlock.using( - [lockKey], - DEAD_LOCK_HOLDER_PREVENTION_TIMEOUT_MS, - callback, - ) + try { + return await redlock.using( + [lockKey], + DEAD_LOCK_HOLDER_PREVENTION_TIMEOUT_MS, + callback, + ) + } catch (error) { + if (!(error instanceof ResourceLockedError)) { + throw error + } + + throw new RetriableError({ + error: 'Unable to acquire excel session lock.', + delayInMs: LOCK_FAILURE_RETRY_DELAY_MS, + }) + } } // diff --git a/packages/backend/src/errors/retriable-error.ts b/packages/backend/src/errors/retriable-error.ts index 4bc37e9b7..3737d6d88 100644 --- a/packages/backend/src/errors/retriable-error.ts +++ b/packages/backend/src/errors/retriable-error.ts @@ -6,7 +6,13 @@ interface RetriableErrorParams { } /** - * Throw this in a worker action body to get BullMQ to retry. + * When thrown, this error indicates that we should requeue the action and for + * retrying. This is passed directly to our BullMQ's backoff strategy callback. + * + * Typically used in these 2 places: + * 1. Explicitly thrown by action code in response to some app-specific event + * (e.g. rate limited by M365). + * 2. Default action error handler (`handleErrorAndThrow`). */ export default class RetriableError extends BaseError { delayInMs: RetriableErrorParams['delayInMs'] diff --git a/packages/backend/src/helpers/actions.ts b/packages/backend/src/helpers/actions.ts index 9a84879ab..3fa44f2ab 100644 --- a/packages/backend/src/helpers/actions.ts +++ b/packages/backend/src/helpers/actions.ts @@ -62,11 +62,18 @@ export function handleErrorAndThrow( // This is thrown from app.run, which _in theory_ can be anything. executionError: unknown, ): never { - // Only support retrying HTTP errors for now. + // Edge case as some actions throw StepError now, but others don't. if (executionError instanceof StepError) { executionError = executionError.cause } - if (!executionError || !(executionError instanceof HttpError)) { + + // We passthrough RetriableErrors thrown directly by the action. + if (executionError instanceof RetriableError) { + throw executionError + } + + // Otherwise... we only support automatically retrying HTTP errors for now. + if (!(executionError instanceof HttpError)) { throw new UnrecoverableError(JSON.stringify(errorDetails)) } diff --git a/packages/backend/src/helpers/http-client/index.ts b/packages/backend/src/helpers/http-client/index.ts index 3c123b151..bcad0cb97 100644 --- a/packages/backend/src/helpers/http-client/index.ts +++ b/packages/backend/src/helpers/http-client/index.ts @@ -6,6 +6,7 @@ import { IHttpClientParams } from '@plumber/types' import { URL } from 'url' import HttpError from '@/errors/http' +import RetriableError from '@/errors/retriable-error' const removeBaseUrlForAbsoluteUrls = ( requestConfig: InternalAxiosRequestConfig, @@ -48,6 +49,20 @@ export default function createHttpClient({ instance.interceptors.response.use( (response) => response, async (error) => { + // EDGE CASE: We allow actions / triggers to throw RetriableError, and + // some of them may choose to throw from beforeRequest. If this happens, + // Axios will still process the error through this response interceptor. + // + // Since RetriableError is supposed to be passed directly through to + // BullMQ, we special case it here instead of converting it to HttpError. + // + // Note that we still want to convert _other_ error types (even if + // they're not axios errors!) to HttpError; this helps prevent accidental + // sensitive data leakage via error objects in our logs. + if (error instanceof RetriableError) { + throw error + } + // This handles system errors like ECONNREFUSED, which don't have a // response body. if (!error.response) {