From c976e222c5428815dd3b4a0ca81ac4369addfd16 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 23 Dec 2024 14:16:15 +0300 Subject: [PATCH] Add example for Retry/Timeout and some fixes --- .changeset/calm-eels-hope.md | 8 ++ e2e/retry-timeout/gateway.config.ts | 19 +++ e2e/retry-timeout/package.json | 11 ++ e2e/retry-timeout/retry-timeout.e2e.ts | 46 ++++++ e2e/retry-timeout/services/flakey.ts | 97 +++++++++++++ packages/executors/http/src/index.ts | 14 +- packages/executors/http/src/utils.ts | 11 ++ packages/fusion-runtime/src/utils.ts | 19 +-- .../runtime/src/plugins/useUpstreamRetry.ts | 21 +-- .../runtime/src/plugins/useUpstreamTimeout.ts | 134 ++++++++++-------- packages/runtime/tests/upstream-retry.test.ts | 3 +- packages/transports/common/src/types.ts | 2 +- yarn.lock | 11 ++ 13 files changed, 309 insertions(+), 87 deletions(-) create mode 100644 .changeset/calm-eels-hope.md create mode 100644 e2e/retry-timeout/gateway.config.ts create mode 100644 e2e/retry-timeout/package.json create mode 100644 e2e/retry-timeout/retry-timeout.e2e.ts create mode 100644 e2e/retry-timeout/services/flakey.ts diff --git a/.changeset/calm-eels-hope.md b/.changeset/calm-eels-hope.md new file mode 100644 index 00000000..aa0f68cb --- /dev/null +++ b/.changeset/calm-eels-hope.md @@ -0,0 +1,8 @@ +--- +'@graphql-mesh/transport-common': patch +'@graphql-tools/executor-http': patch +'@graphql-mesh/fusion-runtime': patch +'@graphql-hive/gateway-runtime': patch +--- + +Fix Retry / Timeout combination diff --git a/e2e/retry-timeout/gateway.config.ts b/e2e/retry-timeout/gateway.config.ts new file mode 100644 index 00000000..3d81c57f --- /dev/null +++ b/e2e/retry-timeout/gateway.config.ts @@ -0,0 +1,19 @@ +import { defineConfig } from '@graphql-hive/gateway'; + +let i = 0; +export const gatewayConfig = defineConfig({ + upstreamRetry: { + maxRetries: 4, + }, + upstreamTimeout: 300, + plugins(ctx) { + return [ + { + onFetch({ options }) { + i++; + ctx.logger.info(`Fetching with ${options.body} for the ${i} time`); + }, + }, + ]; + }, +}); diff --git a/e2e/retry-timeout/package.json b/e2e/retry-timeout/package.json new file mode 100644 index 00000000..d8ff14b3 --- /dev/null +++ b/e2e/retry-timeout/package.json @@ -0,0 +1,11 @@ +{ + "name": "@e2e/retry-timeout", + "private": true, + "dependencies": { + "@apollo/subgraph": "^2.9.3", + "@graphql-hive/gateway": "workspace:*", + "graphql": "16.10.0", + "graphql-yoga": "^5.10.6", + "tslib": "^2.8.1" + } +} diff --git a/e2e/retry-timeout/retry-timeout.e2e.ts b/e2e/retry-timeout/retry-timeout.e2e.ts new file mode 100644 index 00000000..a762258c --- /dev/null +++ b/e2e/retry-timeout/retry-timeout.e2e.ts @@ -0,0 +1,46 @@ +import { createTenv } from '@internal/e2e'; +import { expect, it } from 'vitest'; + +const { service, gateway } = createTenv(__dirname); + +it('Retry & Timeout', async () => { + const flakeyService = await service('flakey'); + const gw = await gateway({ + supergraph: { + with: 'apollo', + services: [flakeyService], + }, + }); + + const res = await gw.execute({ + query: /* GraphQL */ ` + query { + product(id: "1") { + id + name + } + } + `, + }); + + expect(res).toEqual({ + data: { + product: { + id: '1', + name: 'Product 1', + }, + }, + }); + + const logs = gw.getStd('both'); + // The first request will fail, and the gateway will retry 2 more times + expect(logs).toContain( + 'Fetching with {"query":"{__typename product(id:\\"1\\"){id name}}"} for the 1 time', + ); + expect(logs).toContain( + 'Fetching with {"query":"{__typename product(id:\\"1\\"){id name}}"} for the 2 time', + ); + expect(logs).toContain( + 'Fetching with {"query":"{__typename product(id:\\"1\\"){id name}}"} for the 3 time', + ); +}); diff --git a/e2e/retry-timeout/services/flakey.ts b/e2e/retry-timeout/services/flakey.ts new file mode 100644 index 00000000..126350c2 --- /dev/null +++ b/e2e/retry-timeout/services/flakey.ts @@ -0,0 +1,97 @@ +import { createServer } from 'http'; +import { buildSubgraphSchema } from '@apollo/subgraph'; +import { GraphQLResolverMap } from '@apollo/subgraph/dist/schema-helper'; +import { Opts } from '@internal/testing'; +import { parse } from 'graphql'; +import { + createGraphQLError, + createYoga, + YogaInitialContext, +} from 'graphql-yoga'; + +const opts = Opts(process.argv); + +let i = 0; +let lastAttempt: number | undefined; + +const servicePort = opts.getServicePort('flakey'); + +const resolvers = { + Query: { + product: ( + _: unknown, + { id }: { id: string }, + context: YogaInitialContext, + ) => { + i++; + console.log(`${i} attempt`); + if (lastAttempt && Date.now() - lastAttempt < 1000) { + const secondsToWait = Math.ceil( + (1000 - (Date.now() - lastAttempt)) / 1000, + ); + return createGraphQLError('You are too early, still wait...', { + extensions: { + http: { + status: 429, + headers: { + 'retry-after': secondsToWait.toString(), + }, + }, + }, + }); + } + lastAttempt = Date.now(); + // First attempt will fail with timeout + if (i === 1) { + let reject: (reason: any) => void; + const promise = new Promise((_resolve, _reject) => { + reject = _reject; + }); + setTimeout(() => { + reject('Timeout'); + }, 1000); + return promise; + } + // Second attempt will fail with 500 + if (i === 2) { + return createGraphQLError('Flakiness...', { + extensions: { + http: { + status: 503, + headers: { + 'retry-after': '1', + }, + }, + }, + }); + } + // Third attempt will return + return { + id, + name: 'Product ' + id, + }; + }, + }, +} as GraphQLResolverMap; + +createServer( + createYoga({ + schema: buildSubgraphSchema({ + typeDefs: parse(/* GraphQL */ ` + type Query { + product(id: ID!): Product + } + + type Product { + id: ID! + name: String! + } + `), + resolvers, + }), + }), +).listen(servicePort, () => { + console.log( + `🚀 Flakey service ready at http://localhost:${servicePort}/graphql`, + ); +}); diff --git a/packages/executors/http/src/index.ts b/packages/executors/http/src/index.ts index 24d70ef0..fb5c1ec4 100644 --- a/packages/executors/http/src/index.ts +++ b/packages/executors/http/src/index.ts @@ -248,7 +248,6 @@ export function buildHTTPExecutor( request: { method, }, - response: {}, }; const query = printFn(request.document); @@ -351,6 +350,7 @@ export function buildHTTPExecutor( } }) .then((fetchResult: Response): any => { + upstreamErrorExtensions.response ||= {}; upstreamErrorExtensions.response.status = fetchResult.status; upstreamErrorExtensions.response.statusText = fetchResult.statusText; Object.defineProperty(upstreamErrorExtensions.response, 'headers', { @@ -381,6 +381,7 @@ export function buildHTTPExecutor( }) .then((result) => { if (typeof result === 'string') { + upstreamErrorExtensions.response ||= {}; upstreamErrorExtensions.response.body = result; if (result) { try { @@ -566,10 +567,11 @@ function coerceFetchError( extensions: upstreamErrorExtensions, originalError: e, }); - } else if (e.name === 'AbortError' && signal?.reason) { - return createGraphQLErrorForAbort(signal?.reason, { - extensions: upstreamErrorExtensions, - }); + } else if (e.name === 'AbortError') { + return createGraphQLErrorForAbort( + signal?.reason || e, + upstreamErrorExtensions, + ); } else if (e.message) { return createGraphQLError(e.message, { extensions: upstreamErrorExtensions, @@ -591,7 +593,7 @@ interface UpstreamErrorExtensions { method: string; body?: unknown; }; - response: { + response?: { status?: number; statusText?: string; headers?: Record; diff --git a/packages/executors/http/src/utils.ts b/packages/executors/http/src/utils.ts index ea300303..0bd910a9 100644 --- a/packages/executors/http/src/utils.ts +++ b/packages/executors/http/src/utils.ts @@ -1,5 +1,6 @@ import { createGraphQLError, mapMaybePromise } from '@graphql-tools/utils'; import { crypto, TextEncoder } from '@whatwg-node/fetch'; +import { GraphQLError } from 'graphql'; export function createAbortErrorReason() { return new Error('Executor was disposed.'); @@ -9,8 +10,18 @@ export function createGraphQLErrorForAbort( reason: any, extensions?: Record, ) { + if (reason instanceof GraphQLError) { + return reason; + } + if (reason?.name === 'TimeoutError') { + return createGraphQLError(reason.message, { + extensions, + originalError: reason, + }); + } return createGraphQLError('The operation was aborted. reason: ' + reason, { extensions, + originalError: reason, }); } diff --git a/packages/fusion-runtime/src/utils.ts b/packages/fusion-runtime/src/utils.ts index fd5469f0..46051971 100644 --- a/packages/fusion-runtime/src/utils.ts +++ b/packages/fusion-runtime/src/utils.ts @@ -248,30 +248,33 @@ declare module 'graphql' { * with `onSubgraphExecuteHooks` to hook into the execution phase of subgraphs */ export function wrapExecutorWithHooks({ - executor, + executor: baseExecutor, onSubgraphExecuteHooks, subgraphName, transportEntryMap, getSubgraphSchema, transportContext, }: WrapExecuteWithHooksOptions): Executor { - return function executorWithHooks(executionRequest: ExecutionRequest) { - executionRequest.info = executionRequest.info || ({} as GraphQLResolveInfo); - executionRequest.info.executionRequest = executionRequest; + return function executorWithHooks(baseExecutionRequest: ExecutionRequest) { + baseExecutionRequest.info = + baseExecutionRequest.info || ({} as GraphQLResolveInfo); + baseExecutionRequest.info.executionRequest = baseExecutionRequest; const requestId = - executionRequest.context?.request && - requestIdByRequest.get(executionRequest.context.request); + baseExecutionRequest.context?.request && + requestIdByRequest.get(baseExecutionRequest.context.request); let execReqLogger = transportContext?.logger; if (execReqLogger) { if (requestId) { execReqLogger = execReqLogger.child(requestId); } - loggerForExecutionRequest.set(executionRequest, execReqLogger); + loggerForExecutionRequest.set(baseExecutionRequest, execReqLogger); } execReqLogger = execReqLogger?.child?.(subgraphName); if (onSubgraphExecuteHooks.length === 0) { - return executor(executionRequest); + return baseExecutor(baseExecutionRequest); } + let executor = baseExecutor; + let executionRequest = baseExecutionRequest; const onSubgraphExecuteDoneHooks: OnSubgraphExecuteDoneHook[] = []; return mapMaybePromise( iterateAsync( diff --git a/packages/runtime/src/plugins/useUpstreamRetry.ts b/packages/runtime/src/plugins/useUpstreamRetry.ts index 83773dcf..6ce85d81 100644 --- a/packages/runtime/src/plugins/useUpstreamRetry.ts +++ b/packages/runtime/src/plugins/useUpstreamRetry.ts @@ -82,9 +82,10 @@ export function useUpstreamRetry>( } // If there are errors that are not original GraphQL errors, retry if ( - !isAsyncIterable(executionResult) && - executionResult.errors?.length && - !executionResult.errors.some(isOriginalGraphQLError) + !executionResult || + (!isAsyncIterable(executionResult) && + executionResult.errors?.length && + executionResult.errors.some((e) => !isOriginalGraphQLError(e))) ) { return true; } @@ -93,18 +94,18 @@ export function useUpstreamRetry>( } = optsForReq; if (maxRetries > 0) { setExecutor(function (executionRequest: ExecutionRequest) { - let retries = maxRetries + 1; + let attemptsLeft = maxRetries + 1; let executionResult: MaybeAsyncIterable; let currRetryDelay = retryDelay; function retry(): MaybePromise< MaybeAsyncIterable > { - retries--; try { - if (retries < 0) { + if (attemptsLeft <= 0) { return executionResult; } const requestTime = Date.now(); + attemptsLeft--; return mapMaybePromise( executor(executionRequest), (currRes) => { @@ -112,6 +113,8 @@ export function useUpstreamRetry>( let retryAfterSecondsFromHeader: number | undefined; const response = executionRequestResponseMap.get(executionRequest); + // Remove the response from the map after used so we don't see it again + executionRequestResponseMap.delete(executionRequest); const retryAfterHeader = response?.headers.get('Retry-After'); if (retryAfterHeader) { @@ -142,17 +145,17 @@ export function useUpstreamRetry>( timeouts.add(timeout); }); } - return currRes; + return executionResult; }, (e) => { - if (retries < 0) { + if (attemptsLeft <= 0) { throw e; } return retry(); }, ); } catch (e) { - if (retries < 0) { + if (attemptsLeft <= 0) { throw e; } return retry(); diff --git a/packages/runtime/src/plugins/useUpstreamTimeout.ts b/packages/runtime/src/plugins/useUpstreamTimeout.ts index a6d85113..1636a759 100644 --- a/packages/runtime/src/plugins/useUpstreamTimeout.ts +++ b/packages/runtime/src/plugins/useUpstreamTimeout.ts @@ -1,7 +1,4 @@ -import { - abortSignalAny, - isAbortSignalFromAny, -} from '@graphql-hive/gateway-abort-signal-any'; +import { abortSignalAny } from '@graphql-hive/gateway-abort-signal-any'; import { subgraphNameByExecutionRequest } from '@graphql-mesh/fusion-runtime'; import { UpstreamErrorExtensions } from '@graphql-mesh/transport-common'; import { getHeadersObj } from '@graphql-mesh/utils'; @@ -45,39 +42,47 @@ export function useUpstreamTimeout>( }) { const timeout = timeoutFactory({ subgraphName, executionRequest }); if (timeout) { - let timeoutSignal = - timeoutSignalsByExecutionRequest.get(executionRequest); - if (!timeoutSignal) { - timeoutSignal = AbortSignal.timeout(timeout); - timeoutSignalsByExecutionRequest.set(executionRequest, timeoutSignal); - } - timeoutSignalsByExecutionRequest.set(executionRequest, timeoutSignal); - const timeout$ = new Promise((_, reject) => { - if (timeoutSignal.aborted) { - reject(timeoutSignal.reason); - } - timeoutSignal.addEventListener('abort', () => - reject(timeoutSignal.reason), - ); - }); - if (isAbortSignalFromAny(executionRequest.signal)) { - executionRequest.signal.addSignals([timeoutSignal]); - } else { - const signals = [timeoutSignal]; - if (executionRequest.signal) { - signals.push(executionRequest.signal); - } - executionRequest.signal = abortSignalAny(signals); - } setExecutor(function timeoutExecutor( executionRequest: ExecutionRequest, ) { - return Promise.race([timeout$, executor(executionRequest)]) + let timeoutSignal = + timeoutSignalsByExecutionRequest.get(executionRequest); + if (!timeoutSignal) { + timeoutSignal = AbortSignal.timeout(timeout); + } + timeoutSignalsByExecutionRequest.set(executionRequest, timeoutSignal); + const timeout$ = new Promise((_, reject) => { + if (timeoutSignal.aborted) { + reject(timeoutSignal.reason); + return; + } + timeoutSignal.addEventListener( + 'abort', + () => reject(timeoutSignal.reason), + { once: true }, + ); + }); + let finalSignal: AbortSignal | undefined = timeoutSignal; + const signals = new Set(); + signals.add(timeoutSignal); + if (executionRequest.signal) { + signals.add(executionRequest.signal); + finalSignal = abortSignalAny(signals); + } + return Promise.race([ + timeout$, + executor({ + ...executionRequest, + signal: finalSignal, + }), + ]) .then((result) => { if (isAsyncIterable(result)) { const iterator = result[Symbol.asyncIterator](); - timeoutSignal.addEventListener('abort', () => - iterator.return?.(timeoutSignal.reason), + timeoutSignal.addEventListener( + 'abort', + () => iterator.return?.(timeoutSignal.reason), + { once: true }, ); return { [Symbol.asyncIterator]() { @@ -91,49 +96,55 @@ export function useUpstreamTimeout>( if (e === timeoutSignal.reason) { const upstreamErrorExtensions = errorExtensionsByExecRequest.get(executionRequest); - return { - errors: [ - createGraphQLError(e.message, { - extensions: upstreamErrorExtensions, - }), - ], - }; + throw createGraphQLError(e.message, { + originalError: e, + extensions: upstreamErrorExtensions, + }); } throw e; + }) + .finally(() => { + // Remove from the map after used so we don't see it again + errorExtensionsByExecRequest.delete(executionRequest); + timeoutSignalsByExecutionRequest.delete(executionRequest); }) as MaybePromise>; }); } return undefined; }, - onFetch({ url, executionRequest, options }) { + onFetch({ url, executionRequest, options, setOptions }) { const subgraphName = executionRequest && subgraphNameByExecutionRequest.get(executionRequest); - const timeout = timeoutFactory({ subgraphName, executionRequest }); - if (timeout) { - let timeoutSignal: AbortSignal | undefined; - if (executionRequest) { - timeoutSignal = - timeoutSignalsByExecutionRequest.get(executionRequest); - if (!timeoutSignal) { + if ( + !executionRequest || + !timeoutSignalsByExecutionRequest.has(executionRequest) + ) { + const timeout = timeoutFactory({ subgraphName, executionRequest }); + if (timeout) { + let timeoutSignal: AbortSignal | undefined; + if (executionRequest) { + timeoutSignal = + timeoutSignalsByExecutionRequest.get(executionRequest); + if (!timeoutSignal) { + timeoutSignal = AbortSignal.timeout(timeout); + timeoutSignalsByExecutionRequest.set( + executionRequest, + timeoutSignal, + ); + } + } else { timeoutSignal = AbortSignal.timeout(timeout); - timeoutSignalsByExecutionRequest.set( - executionRequest, - timeoutSignal, - ); } - timeoutSignalsByExecutionRequest.set(executionRequest, timeoutSignal); - } else { - timeoutSignal = AbortSignal.timeout(timeout); - } - if (isAbortSignalFromAny(options.signal)) { - options.signal.addSignals([timeoutSignal]); - } else { - const signals = [timeoutSignal]; + const signals = new Set(); + signals.add(timeoutSignal); if (options.signal) { - signals.push(options.signal); + signals.add(options.signal); + setOptions({ + ...options, + signal: abortSignalAny(signals), + }); } - options.signal = abortSignalAny(signals); } } if (executionRequest) { @@ -144,18 +155,17 @@ export function useUpstreamTimeout>( method: options.method, body: options.body, }, - response: {}, }; errorExtensionsByExecRequest.set( executionRequest, upstreamErrorExtensions, ); return function onFetchDone({ response }) { + timeoutSignalsByExecutionRequest.delete(executionRequest); upstreamErrorExtensions.response = { status: response.status, statusText: response.statusText, headers: getHeadersObj(response.headers), - body: response.body, }; }; } diff --git a/packages/runtime/tests/upstream-retry.test.ts b/packages/runtime/tests/upstream-retry.test.ts index a11e5ef6..d3987ed3 100644 --- a/packages/runtime/tests/upstream-retry.test.ts +++ b/packages/runtime/tests/upstream-retry.test.ts @@ -12,6 +12,7 @@ describe('Upstream Retry', () => { it('respects \`maxRetries\`', async () => { let attempts = 0; let maxRetries = 2; + const retryDelay = 100; const failUntil = 3; const upstreamSchema = createSchema({ typeDefs: /* GraphQL */ ` @@ -63,7 +64,7 @@ describe('Upstream Retry', () => { }, upstreamRetry: () => ({ maxRetries, - retryDelay: 100, + retryDelay, }), }); const res = await gateway.fetch('http://localhost:4000/graphql', { diff --git a/packages/transports/common/src/types.ts b/packages/transports/common/src/types.ts index d8ea51ab..06c5a7a7 100644 --- a/packages/transports/common/src/types.ts +++ b/packages/transports/common/src/types.ts @@ -60,7 +60,7 @@ export interface UpstreamErrorExtensions { method?: string; body?: unknown; }; - response: { + response?: { status?: number; statusText?: string; headers?: Record; diff --git a/yarn.lock b/yarn.lock index a1187f72..9c2c95ab 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2547,6 +2547,17 @@ __metadata: languageName: unknown linkType: soft +"@e2e/retry-timeout@workspace:e2e/retry-timeout": + version: 0.0.0-use.local + resolution: "@e2e/retry-timeout@workspace:e2e/retry-timeout" + dependencies: + "@graphql-hive/gateway": "workspace:*" + graphql: "npm:16.10.0" + graphql-yoga: "npm:^5.10.6" + tslib: "npm:^2.8.1" + languageName: unknown + linkType: soft + "@e2e/subscriptions-cancellation@workspace:e2e/subscriptions-cancellation": version: 0.0.0-use.local resolution: "@e2e/subscriptions-cancellation@workspace:e2e/subscriptions-cancellation"