diff --git a/.changeset/@graphql-hive_gateway-runtime-322-dependencies.md b/.changeset/@graphql-hive_gateway-runtime-322-dependencies.md new file mode 100644 index 00000000..bb357f90 --- /dev/null +++ b/.changeset/@graphql-hive_gateway-runtime-322-dependencies.md @@ -0,0 +1,7 @@ +--- +'@graphql-hive/gateway-runtime': patch +--- + +dependencies updates: + +- Added dependency [`@graphql-hive/gateway-abort-signal-any@workspace:^` ↗︎](https://www.npmjs.com/package/@graphql-hive/gateway-abort-signal-any/v/workspace:^) (to `dependencies`) diff --git a/.changeset/@graphql-mesh_transport-common-322-dependencies.md b/.changeset/@graphql-mesh_transport-common-322-dependencies.md new file mode 100644 index 00000000..ab63d3e0 --- /dev/null +++ b/.changeset/@graphql-mesh_transport-common-322-dependencies.md @@ -0,0 +1,9 @@ +--- +'@graphql-mesh/transport-common': patch +--- + +dependencies updates: + +- Added dependency [`@graphql-hive/gateway-abort-signal-any@workspace:^` ↗︎](https://www.npmjs.com/package/@graphql-hive/gateway-abort-signal-any/v/workspace:^) (to `dependencies`) +- Added dependency [`@graphql-tools/executor@^1.3.8` ↗︎](https://www.npmjs.com/package/@graphql-tools/executor/v/1.3.8) (to `dependencies`) +- Removed dependency [`@graphql-tools/delegate@workspace:^` ↗︎](https://www.npmjs.com/package/@graphql-tools/delegate/v/workspace:^) (from `dependencies`) diff --git a/.changeset/@graphql-tools_delegate-322-dependencies.md b/.changeset/@graphql-tools_delegate-322-dependencies.md new file mode 100644 index 00000000..7789217a --- /dev/null +++ b/.changeset/@graphql-tools_delegate-322-dependencies.md @@ -0,0 +1,7 @@ +--- +'@graphql-tools/delegate': patch +--- + +dependencies updates: + +- Updated dependency [`@graphql-tools/executor@^1.3.8` ↗︎](https://www.npmjs.com/package/@graphql-tools/executor/v/1.3.8) (from `^1.3.6`, in `dependencies`) diff --git a/.changeset/@graphql-tools_executor-http-322-dependencies.md b/.changeset/@graphql-tools_executor-http-322-dependencies.md new file mode 100644 index 00000000..ab31d131 --- /dev/null +++ b/.changeset/@graphql-tools_executor-http-322-dependencies.md @@ -0,0 +1,7 @@ +--- +'@graphql-tools/executor-http': patch +--- + +dependencies updates: + +- Added dependency [`@graphql-hive/gateway-abort-signal-any@workspace:^` ↗︎](https://www.npmjs.com/package/@graphql-hive/gateway-abort-signal-any/v/workspace:^) (to `dependencies`) diff --git a/.changeset/late-socks-repeat.md b/.changeset/late-socks-repeat.md new file mode 100644 index 00000000..065bd647 --- /dev/null +++ b/.changeset/late-socks-repeat.md @@ -0,0 +1,5 @@ +--- +'@graphql-hive/gateway-abort-signal-any': patch +--- + +New package diff --git a/.changeset/witty-candles-whisper.md b/.changeset/witty-candles-whisper.md new file mode 100644 index 00000000..70de30d7 --- /dev/null +++ b/.changeset/witty-candles-whisper.md @@ -0,0 +1,50 @@ +--- +'@graphql-hive/gateway-runtime': minor +'@graphql-hive/gateway': minor +--- + +New Retry and Timeout plugins; + +- Retry plugin: Retry a request if it fails + +It respects the `Retry-After` HTTP header, [See more about this HTTP](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After) + +```ts +export const gatewayConfig = defineConfig({ + upstreamRetry: { + // The maximum number of retries to attempt. + maxRetries: 3, // required + // The delay between retries in milliseconds. + retryDelay: 1000, // default + /** + * A function that determines whether a response should be retried. + * If the upstream returns `Retry-After` header, the request will be retried. + */ + shouldRetry: ({ response }) => response?.status >= 500 || response?.status === 429 + } + // or you can configure it by subgraph name + upstreamRetry({ subgraphName }) { + if (subgraphName === 'my-rate-limited-subgraph') { + return { + maxRetries: 3, + } + } + return { maxRetries: 10 } + } +}) +``` + +- Timeout plugin: Timeout a request if it takes too long + +```ts +export const gatewayConfig = defineConfig({ + // The maximum time in milliseconds to wait for a response from the upstream. + upstreamTimeout: 1000, // required + // or you can configure it by subgraph name + upstreamTimeout({ subgraphName }) { + if (subgraphName === 'my-slow-subgraph') { + return 1000; + } + } +}) +``` diff --git a/packages/abort-signal-any/package.json b/packages/abort-signal-any/package.json new file mode 100644 index 00000000..a1a79041 --- /dev/null +++ b/packages/abort-signal-any/package.json @@ -0,0 +1,46 @@ +{ + "name": "@graphql-hive/gateway-abort-signal-any", + "version": "0.0.0", + "type": "module", + "repository": { + "type": "git", + "url": "git+https://github.com/graphql-hive/gateway.git", + "directory": "packages/abort-signal-any" + }, + "license": "MIT", + "engines": { + "node": ">=18.0.0" + }, + "main": "./dist/index.js", + "exports": { + ".": { + "require": { + "types": "./dist/index.d.cts", + "default": "./dist/index.cjs" + }, + "import": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + } + }, + "./package.json": "./package.json" + }, + "types": "./dist/index.d.ts", + "files": [ + "dist" + ], + "scripts": { + "build": "pkgroll --clean-dist", + "prepack": "yarn build" + }, + "dependencies": { + "tslib": "^2.8.1" + }, + "devDependencies": { + "pkgroll": "2.5.1" + }, + "publishConfig": { + "access": "public" + }, + "sideEffects": false +} diff --git a/packages/abort-signal-any/src/index.ts b/packages/abort-signal-any/src/index.ts new file mode 100644 index 00000000..ba13df91 --- /dev/null +++ b/packages/abort-signal-any/src/index.ts @@ -0,0 +1,66 @@ +export type AbortSignalFromAny = AbortSignal & { + signals: Set; + addSignals(signals: Iterable): void; +}; + +export function isAbortSignalFromAny( + signal?: AbortSignal | null, +): signal is AbortSignalFromAny { + return signal != null && 'signals' in signal && 'addSignals' in signal; +} + +export function abortSignalAny(givenSignals: Iterable) { + const signals = new Set(); + let singleSignal: AbortSignal | undefined; + for (const signal of givenSignals) { + if (isAbortSignalFromAny(signal)) { + for (const childSignal of signal.signals) { + singleSignal = childSignal; + signals.add(childSignal); + } + } else { + singleSignal = signal; + signals.add(signal); + } + } + if (signals.size < 2) { + return singleSignal; + } + if (signals.size === 0) { + return undefined; + } + const ctrl = new AbortController(); + function onAbort(this: AbortSignal, ev: Event) { + const signal = (ev.target as AbortSignal) || this; + ctrl.abort(signal.reason); + for (const signal of signals) { + signal.removeEventListener('abort', onAbort); + } + } + for (const signal of signals) { + signal.addEventListener('abort', onAbort, { once: true }); + } + Object.defineProperties(ctrl.signal, { + signals: { value: signals }, + addSignals: { + value(newSignals: Iterable) { + for (const signal of newSignals) { + if (isAbortSignalFromAny(signal)) { + for (const childSignal of signal.signals) { + if (!signals.has(childSignal)) { + signals.add(childSignal); + childSignal.addEventListener('abort', onAbort, { once: true }); + } + } + } else { + if (!signals.has(signal)) { + signals.add(signal); + signal.addEventListener('abort', onAbort, { once: true }); + } + } + } + }, + }, + }); + return ctrl.signal as AbortSignalFromAny; +} diff --git a/packages/delegate/package.json b/packages/delegate/package.json index 5bdf1cd0..ac2effd6 100644 --- a/packages/delegate/package.json +++ b/packages/delegate/package.json @@ -39,7 +39,7 @@ }, "dependencies": { "@graphql-tools/batch-execute": "workspace:^", - "@graphql-tools/executor": "^1.3.6", + "@graphql-tools/executor": "^1.3.8", "@graphql-tools/schema": "^10.0.11", "@graphql-tools/utils": "^10.6.2", "@repeaterjs/repeater": "^3.0.6", diff --git a/packages/delegate/src/delegateToSchema.ts b/packages/delegate/src/delegateToSchema.ts index 674a933d..b9bef40f 100644 --- a/packages/delegate/src/delegateToSchema.ts +++ b/packages/delegate/src/delegateToSchema.ts @@ -1,7 +1,6 @@ import { getBatchingExecutor } from '@graphql-tools/batch-execute'; -import { normalizedExecutor } from '@graphql-tools/executor'; +import { executorFromSchema } from '@graphql-tools/executor'; import { - ExecutionRequest, ExecutionResult, Executor, getDefinedRootType, @@ -11,7 +10,6 @@ import { mapMaybePromise, Maybe, MaybeAsyncIterable, - memoize1, } from '@graphql-tools/utils'; import { Repeater } from '@repeaterjs/repeater'; import { dset } from 'dset/merge'; @@ -295,7 +293,7 @@ function getExecutor>( const { subschemaConfig, targetSchema, context } = delegationContext; let executor: Executor = - subschemaConfig?.executor || createDefaultExecutor(targetSchema); + subschemaConfig?.executor || executorFromSchema(targetSchema); if (subschemaConfig?.batch) { const batchingOptions = subschemaConfig?.batchingOptions; @@ -310,17 +308,4 @@ function getExecutor>( return executor; } -export const createDefaultExecutor = memoize1(function createDefaultExecutor( - schema: GraphQLSchema, -): Executor { - return function defaultExecutor(request: ExecutionRequest) { - return normalizedExecutor({ - schema, - document: request.document, - rootValue: request.rootValue, - contextValue: request.context, - variableValues: request.variables, - operationName: request.operationName, - }); - }; -}); +export { executorFromSchema as createDefaultExecutor }; diff --git a/packages/executors/graphql-ws/src/index.ts b/packages/executors/graphql-ws/src/index.ts index a684987f..65d0b7ca 100644 --- a/packages/executors/graphql-ws/src/index.ts +++ b/packages/executors/graphql-ws/src/index.ts @@ -79,6 +79,8 @@ export function buildGraphQLWSExecutor( operationName, extensions, operationType = getOperationASTFromRequest(executionRequest).operation, + info, + signal = info?.signal, } = executionRequest; // additional connection params can be supplied through the "connectionParams" field in extensions. // TODO: connection params only from the FIRST operation in lazy mode will be used (detect connectionParams changes and reconnect, too implicit?) @@ -98,6 +100,13 @@ export function buildGraphQLWSExecutor( operationName, extensions, }); + signal?.addEventListener( + 'abort', + () => { + iterableIterator.return?.(); + }, + { once: true }, + ); if (operationType === 'subscription') { return iterableIterator; } diff --git a/packages/executors/http/package.json b/packages/executors/http/package.json index 97bc7a26..e1984381 100644 --- a/packages/executors/http/package.json +++ b/packages/executors/http/package.json @@ -39,6 +39,7 @@ "graphql": "^14.0.0 || ^15.0.0 || ^16.0.0 || ^17.0.0" }, "dependencies": { + "@graphql-hive/gateway-abort-signal-any": "workspace:^", "@graphql-tools/utils": "^10.6.2", "@repeaterjs/repeater": "^3.0.4", "@whatwg-node/disposablestack": "^0.0.5", diff --git a/packages/executors/http/src/handleEventStreamResponse.ts b/packages/executors/http/src/handleEventStreamResponse.ts index 162d16be..1498e9f6 100644 --- a/packages/executors/http/src/handleEventStreamResponse.ts +++ b/packages/executors/http/src/handleEventStreamResponse.ts @@ -10,8 +10,8 @@ export function isReadableStream(value: any): value is ReadableStream { } export function handleEventStreamResponse( - signal: AbortSignal, response: Response, + signal?: AbortSignal, ) { // node-fetch returns body as a promise so we need to resolve it const body = response.body; @@ -35,7 +35,7 @@ export function handleEventStreamResponse( let currChunk = ''; async function pump() { - if (signal.aborted) { + if (signal?.aborted) { await push(createResultForAbort(signal)); return stop(); } diff --git a/packages/executors/http/src/index.ts b/packages/executors/http/src/index.ts index a2ee5bec..24d70ef0 100644 --- a/packages/executors/http/src/index.ts +++ b/packages/executors/http/src/index.ts @@ -1,3 +1,4 @@ +import { abortSignalAny } from '@graphql-hive/gateway-abort-signal-any'; import { createGraphQLError, DisposableAsyncExecutor, @@ -229,14 +230,20 @@ export function buildHTTPExecutor( request.extensions = restExtensions; } - let signal = sharedSignal; + const signals = [sharedSignal]; + const signalFromRequest = request.signal || request.info?.signal; + if (signalFromRequest) { + if (signalFromRequest.aborted) { + return createResultForAbort(signalFromRequest.reason); + } + signals.push(signalFromRequest); + } if (options?.timeout) { - signal = AbortSignal.any([ - sharedSignal, - AbortSignal.timeout(options.timeout), - ]); + signals.push(AbortSignal.timeout(options.timeout)); } + const signal = abortSignalAny(signals); + const upstreamErrorExtensions: UpstreamErrorExtensions = { request: { method, @@ -365,7 +372,7 @@ export function buildHTTPExecutor( const contentType = fetchResult.headers.get('content-type'); if (contentType?.includes('text/event-stream')) { - return handleEventStreamResponse(signal, fetchResult); + return handleEventStreamResponse(fetchResult, signal); } else if (contentType?.includes('multipart/mixed')) { return handleMultipartMixedResponse(fetchResult); } @@ -543,7 +550,7 @@ function coerceFetchError( endpoint, upstreamErrorExtensions, }: { - signal: AbortSignal; + signal?: AbortSignal; endpoint: string; upstreamErrorExtensions: UpstreamErrorExtensions; }, @@ -559,8 +566,8 @@ function coerceFetchError( extensions: upstreamErrorExtensions, originalError: e, }); - } else if (e.name === 'AbortError' && signal.reason) { - return createGraphQLErrorForAbort(signal.reason, { + } else if (e.name === 'AbortError' && signal?.reason) { + return createGraphQLErrorForAbort(signal?.reason, { extensions: upstreamErrorExtensions, }); } else if (e.message) { diff --git a/packages/executors/http/tests/handleEventStreamResponse.test.ts b/packages/executors/http/tests/handleEventStreamResponse.test.ts index 4b7e11dd..57d361b3 100644 --- a/packages/executors/http/tests/handleEventStreamResponse.test.ts +++ b/packages/executors/http/tests/handleEventStreamResponse.test.ts @@ -4,20 +4,6 @@ import { describe, expect, it } from 'vitest'; import { handleEventStreamResponse } from '../src/handleEventStreamResponse.js'; describe('handleEventStreamResponse', () => { - const fakeSignal: AbortSignal = { - aborted: false, - addEventListener() {}, - removeEventListener() {}, - onabort: null, - dispatchEvent() { - return false; - }, - reason: null, - throwIfAborted() {}, - any() { - return fakeSignal; - }, - }; const encoder = new TextEncoder(); it('should handle an event with data', async () => { const readableStream = new ReadableStream({ @@ -29,7 +15,7 @@ describe('handleEventStreamResponse', () => { }); const response = new Response(readableStream); - const asyncIterable = handleEventStreamResponse(fakeSignal, response); + const asyncIterable = handleEventStreamResponse(response); const iterator = asyncIterable[Symbol.asyncIterator](); const { value } = await iterator.next(); @@ -47,7 +33,7 @@ describe('handleEventStreamResponse', () => { }, }); const response = new Response(readableStream); - const asyncIterable = handleEventStreamResponse(fakeSignal, response); + const asyncIterable = handleEventStreamResponse(response); const iterator = asyncIterable[Symbol.asyncIterator](); const iteratorResult = await iterator.next(); @@ -69,7 +55,7 @@ describe('handleEventStreamResponse', () => { }); const response = new Response(readableStream); - const asyncIterable = handleEventStreamResponse(fakeSignal, response); + const asyncIterable = handleEventStreamResponse(response); const iterator = asyncIterable[Symbol.asyncIterator](); expect(await iterator.next()).toEqual({ @@ -107,7 +93,7 @@ describe('handleEventStreamResponse', () => { }); const response = new Response(readableStream); - const asyncIterable = handleEventStreamResponse(fakeSignal, response); + const asyncIterable = handleEventStreamResponse(response); const iterator = asyncIterable[Symbol.asyncIterator](); expect(await iterator.next()).toEqual({ diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 88aaa6e3..0e34b815 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -54,6 +54,7 @@ "@envelop/disable-introspection": "^6.0.0", "@envelop/generic-auth": "^8.0.0", "@graphql-hive/core": "^0.8.1", + "@graphql-hive/gateway-abort-signal-any": "workspace:^", "@graphql-mesh/cross-helpers": "^0.4.9", "@graphql-mesh/fusion-runtime": "workspace:^", "@graphql-mesh/hmac-upstream-signature": "workspace:^", diff --git a/packages/runtime/src/createGatewayRuntime.ts b/packages/runtime/src/createGatewayRuntime.ts index bfbdeda8..ed1577e5 100644 --- a/packages/runtime/src/createGatewayRuntime.ts +++ b/packages/runtime/src/createGatewayRuntime.ts @@ -93,6 +93,8 @@ import { usePropagateHeaders } from './plugins/usePropagateHeaders'; import { useRequestId } from './plugins/useRequestId'; import { useSubgraphExecuteDebug } from './plugins/useSubgraphExecuteDebug'; import { useUpstreamCancel } from './plugins/useUpstreamCancel'; +import { useUpstreamRetry } from './plugins/useUpstreamRetry'; +import { useUpstreamTimeout } from './plugins/useUpstreamTimeout'; import { useWebhooks } from './plugins/useWebhooks'; import { defaultProductLogo } from './productLogo'; import type { @@ -1002,6 +1004,14 @@ export function createGatewayRuntime< extraPlugins.push(usePropagateHeaders(config.propagateHeaders)); } + if (config.upstreamTimeout) { + extraPlugins.push(useUpstreamTimeout(config.upstreamTimeout)); + } + + if (config.upstreamRetry) { + extraPlugins.push(useUpstreamRetry(config.upstreamRetry)); + } + const yoga = createYoga({ fetchAPI: config.fetchAPI, logging: logger, diff --git a/packages/runtime/src/index.ts b/packages/runtime/src/index.ts index 5892afb8..b8c5ede9 100644 --- a/packages/runtime/src/index.ts +++ b/packages/runtime/src/index.ts @@ -12,3 +12,5 @@ export { getSdkRequesterForUnifiedGraph, getExecutorForUnifiedGraph, } from '@graphql-mesh/fusion-runtime'; +export { useUpstreamRetry } from './plugins/useUpstreamRetry'; +export { useUpstreamTimeout } from './plugins/useUpstreamTimeout'; diff --git a/packages/runtime/src/plugins/useUpstreamCancel.ts b/packages/runtime/src/plugins/useUpstreamCancel.ts index c520f26b..4518fb8f 100644 --- a/packages/runtime/src/plugins/useUpstreamCancel.ts +++ b/packages/runtime/src/plugins/useUpstreamCancel.ts @@ -1,21 +1,50 @@ +import { + abortSignalAny, + isAbortSignalFromAny, +} from '@graphql-hive/gateway-abort-signal-any'; +import { GraphQLResolveInfo } from '@graphql-tools/utils'; import type { GatewayPlugin } from '../types'; export function useUpstreamCancel(): GatewayPlugin { return { - onFetch({ context, options }) { - if (context?.request) { + onFetch({ context, options, executionRequest, info }) { + const signals: AbortSignal[] = []; + if (context?.request?.signal) { + signals.push(context.request.signal); + } + const execRequestSignal = + executionRequest?.signal || executionRequest?.info?.signal; + if (execRequestSignal) { + signals.push(execRequestSignal); + } + const signalInInfo = (info as GraphQLResolveInfo)?.signal; + if (signalInInfo) { + signals.push(signalInInfo); + } + if (isAbortSignalFromAny(options.signal)) { + options.signal.addSignals(signals); + } else { if (options.signal) { - const ctrl = new AbortController(); - context.request.signal.addEventListener('abort', () => { - ctrl.abort(); - }); - options.signal.addEventListener('abort', () => { - ctrl.abort(); - }); - options.signal = ctrl.signal; - } else { - options.signal = context.request.signal; + signals.push(options.signal); + } + options.signal = abortSignalAny(signals); + } + }, + onSubgraphExecute({ executionRequest }) { + const signals: AbortSignal[] = []; + if (executionRequest.info?.signal) { + signals.push(executionRequest.info.signal); + } + if (executionRequest.context?.request?.signal) { + signals.push(executionRequest.context.request.signal); + } + if (isAbortSignalFromAny(executionRequest.signal)) { + executionRequest.signal.addSignals(signals); + } else { + if (executionRequest.signal) { + signals.push(executionRequest.signal); } + executionRequest.signal = abortSignalAny(signals); } }, }; diff --git a/packages/runtime/src/plugins/useUpstreamRetry.ts b/packages/runtime/src/plugins/useUpstreamRetry.ts new file mode 100644 index 00000000..83773dcf --- /dev/null +++ b/packages/runtime/src/plugins/useUpstreamRetry.ts @@ -0,0 +1,181 @@ +import { isOriginalGraphQLError } from '@envelop/core'; +import { + ExecutionRequest, + ExecutionResult, + isAsyncIterable, + mapMaybePromise, + MaybeAsyncIterable, + MaybePromise, +} from '@graphql-tools/utils'; +import { DisposableSymbols } from '@whatwg-node/disposablestack'; +import { GatewayPlugin } from '../types'; + +export interface UpstreamRetryOptions { + /** + * The maximum number of retries to attempt. + */ + maxRetries: number; + /** + * The minimum delay between retries in milliseconds, but this will be increased on each attempt. + * If the upstream returns `Retry-After` header, the delay will be the value of the header. + * @default 1000 + */ + retryDelay?: number; + /** + * Factor to increase the delay between retries. + * + * @default 1.25 + */ + retryDelayFactor?: number; + /** + * A function that determines whether a response should be retried. + * If the upstream returns `Retry-After` header, the response will be retried. + * By default, it retries on network errors, rate limiting, and non-original GraphQL errors. + */ + shouldRetry?: (payload: ShouldRetryPayload) => boolean; +} + +interface ShouldRetryPayload { + executionRequest: ExecutionRequest; + executionResult: MaybeAsyncIterable; + response?: Response; +} + +export interface UpstreamRetryPayload { + subgraphName: string; + executionRequest: ExecutionRequest; +} + +export type UpstreamRetryPluginOptions = + | UpstreamRetryOptions + | ((payload: UpstreamRetryPayload) => UpstreamRetryOptions | undefined); + +export function useUpstreamRetry>( + opts: UpstreamRetryPluginOptions, +): GatewayPlugin { + const timeouts = new Set>(); + const retryOptions = typeof opts === 'function' ? opts : () => opts; + const executionRequestResponseMap = new WeakMap(); + return { + onSubgraphExecute({ + subgraphName, + executionRequest, + executor, + setExecutor, + }) { + const optsForReq = retryOptions({ subgraphName, executionRequest }); + if (optsForReq) { + const { + maxRetries, + retryDelay = 1000, + retryDelayFactor = 1.25, + shouldRetry = ({ response, executionResult }) => { + if (response) { + // If network error or rate limited, retry + if ( + response.status >= 500 || + response.status === 429 || + response.headers.get('Retry-After') + ) { + return true; + } + } + // If there are errors that are not original GraphQL errors, retry + if ( + !isAsyncIterable(executionResult) && + executionResult.errors?.length && + !executionResult.errors.some(isOriginalGraphQLError) + ) { + return true; + } + return false; + }, + } = optsForReq; + if (maxRetries > 0) { + setExecutor(function (executionRequest: ExecutionRequest) { + let retries = maxRetries + 1; + let executionResult: MaybeAsyncIterable; + let currRetryDelay = retryDelay; + function retry(): MaybePromise< + MaybeAsyncIterable + > { + retries--; + try { + if (retries < 0) { + return executionResult; + } + const requestTime = Date.now(); + return mapMaybePromise( + executor(executionRequest), + (currRes) => { + executionResult = currRes; + let retryAfterSecondsFromHeader: number | undefined; + const response = + executionRequestResponseMap.get(executionRequest); + const retryAfterHeader = + response?.headers.get('Retry-After'); + if (retryAfterHeader) { + retryAfterSecondsFromHeader = + parseInt(retryAfterHeader) * 1000; + if (isNaN(retryAfterSecondsFromHeader)) { + const dateTime = new Date(retryAfterHeader).getTime(); + if (!isNaN(dateTime)) { + retryAfterSecondsFromHeader = dateTime - requestTime; + } + } + } + currRetryDelay = + retryAfterSecondsFromHeader || + currRetryDelay * retryDelayFactor; + if ( + shouldRetry({ + executionRequest, + executionResult, + response, + }) + ) { + return new Promise((resolve) => { + const timeout = setTimeout(() => { + timeouts.delete(timeout); + resolve(retry()); + }, currRetryDelay); + timeouts.add(timeout); + }); + } + return currRes; + }, + (e) => { + if (retries < 0) { + throw e; + } + return retry(); + }, + ); + } catch (e) { + if (retries < 0) { + throw e; + } + return retry(); + } + } + return retry(); + }); + } + } + }, + onFetch({ executionRequest }) { + if (executionRequest) { + return function onFetchDone({ response }) { + executionRequestResponseMap.set(executionRequest, response); + }; + } + return undefined; + }, + [DisposableSymbols.dispose]() { + for (const timeout of timeouts) { + clearTimeout(timeout); + timeouts.delete(timeout); + } + }, + }; +} diff --git a/packages/runtime/src/plugins/useUpstreamTimeout.ts b/packages/runtime/src/plugins/useUpstreamTimeout.ts new file mode 100644 index 00000000..a6d85113 --- /dev/null +++ b/packages/runtime/src/plugins/useUpstreamTimeout.ts @@ -0,0 +1,165 @@ +import { + abortSignalAny, + isAbortSignalFromAny, +} from '@graphql-hive/gateway-abort-signal-any'; +import { subgraphNameByExecutionRequest } from '@graphql-mesh/fusion-runtime'; +import { UpstreamErrorExtensions } from '@graphql-mesh/transport-common'; +import { getHeadersObj } from '@graphql-mesh/utils'; +import { + createGraphQLError, + ExecutionRequest, + ExecutionResult, + isAsyncIterable, + MaybeAsyncIterable, + MaybePromise, +} from '@graphql-tools/utils'; +import { GatewayPlugin } from '../types'; + +export interface TimeoutFactoryPayload { + subgraphName?: string; + executionRequest?: ExecutionRequest; +} + +export type UpstreamTimeoutPluginOptions = + | number + | ((payload: TimeoutFactoryPayload) => number | undefined); + +export function useUpstreamTimeout>( + opts: UpstreamTimeoutPluginOptions, +): GatewayPlugin { + const timeoutFactory = typeof opts === 'function' ? opts : () => opts; + const timeoutSignalsByExecutionRequest = new WeakMap< + ExecutionRequest, + AbortSignal + >(); + const errorExtensionsByExecRequest = new WeakMap< + ExecutionRequest, + UpstreamErrorExtensions + >(); + return { + onSubgraphExecute({ + subgraphName, + executionRequest, + executor, + setExecutor, + }) { + const timeout = timeoutFactory({ subgraphName, executionRequest }); + if (timeout) { + let timeoutSignal = + timeoutSignalsByExecutionRequest.get(executionRequest); + if (!timeoutSignal) { + timeoutSignal = AbortSignal.timeout(timeout); + timeoutSignalsByExecutionRequest.set(executionRequest, timeoutSignal); + } + timeoutSignalsByExecutionRequest.set(executionRequest, timeoutSignal); + const timeout$ = new Promise((_, reject) => { + if (timeoutSignal.aborted) { + reject(timeoutSignal.reason); + } + timeoutSignal.addEventListener('abort', () => + reject(timeoutSignal.reason), + ); + }); + if (isAbortSignalFromAny(executionRequest.signal)) { + executionRequest.signal.addSignals([timeoutSignal]); + } else { + const signals = [timeoutSignal]; + if (executionRequest.signal) { + signals.push(executionRequest.signal); + } + executionRequest.signal = abortSignalAny(signals); + } + setExecutor(function timeoutExecutor( + executionRequest: ExecutionRequest, + ) { + return Promise.race([timeout$, executor(executionRequest)]) + .then((result) => { + if (isAsyncIterable(result)) { + const iterator = result[Symbol.asyncIterator](); + timeoutSignal.addEventListener('abort', () => + iterator.return?.(timeoutSignal.reason), + ); + return { + [Symbol.asyncIterator]() { + return iterator; + }, + }; + } + return result; + }) + .catch((e) => { + if (e === timeoutSignal.reason) { + const upstreamErrorExtensions = + errorExtensionsByExecRequest.get(executionRequest); + return { + errors: [ + createGraphQLError(e.message, { + extensions: upstreamErrorExtensions, + }), + ], + }; + } + throw e; + }) as MaybePromise>; + }); + } + return undefined; + }, + onFetch({ url, executionRequest, options }) { + const subgraphName = + executionRequest && + subgraphNameByExecutionRequest.get(executionRequest); + const timeout = timeoutFactory({ subgraphName, executionRequest }); + if (timeout) { + let timeoutSignal: AbortSignal | undefined; + if (executionRequest) { + timeoutSignal = + timeoutSignalsByExecutionRequest.get(executionRequest); + if (!timeoutSignal) { + timeoutSignal = AbortSignal.timeout(timeout); + timeoutSignalsByExecutionRequest.set( + executionRequest, + timeoutSignal, + ); + } + timeoutSignalsByExecutionRequest.set(executionRequest, timeoutSignal); + } else { + timeoutSignal = AbortSignal.timeout(timeout); + } + if (isAbortSignalFromAny(options.signal)) { + options.signal.addSignals([timeoutSignal]); + } else { + const signals = [timeoutSignal]; + if (options.signal) { + signals.push(options.signal); + } + options.signal = abortSignalAny(signals); + } + } + if (executionRequest) { + const upstreamErrorExtensions: UpstreamErrorExtensions = { + subgraph: subgraphName, + request: { + url, + method: options.method, + body: options.body, + }, + response: {}, + }; + errorExtensionsByExecRequest.set( + executionRequest, + upstreamErrorExtensions, + ); + return function onFetchDone({ response }) { + upstreamErrorExtensions.response = { + status: response.status, + statusText: response.statusText, + headers: getHeadersObj(response.headers), + body: response.body, + }; + }; + } + return undefined; + }, + }; +} diff --git a/packages/runtime/src/types.ts b/packages/runtime/src/types.ts index 927e47c6..719a4619 100644 --- a/packages/runtime/src/types.ts +++ b/packages/runtime/src/types.ts @@ -37,6 +37,8 @@ import type { UnifiedGraphConfig } from './handleUnifiedGraphConfig'; import type { UseContentEncodingOpts } from './plugins/useContentEncoding'; import type { AgentFactory } from './plugins/useCustomAgent'; import { PropagateHeadersOpts } from './plugins/usePropagateHeaders'; +import { UpstreamRetryPluginOptions } from './plugins/useUpstreamRetry'; +import { UpstreamTimeoutPluginOptions } from './plugins/useUpstreamTimeout'; export type { UnifiedGraphConfig, TransportEntryAdditions }; @@ -511,6 +513,20 @@ interface GatewayConfigBase> { * Header Propagation */ propagateHeaders?: PropagateHeadersOpts; + + /** + * Upstream Timeout + * + * Configure the timeout for upstream requests. + */ + upstreamTimeout?: UpstreamTimeoutPluginOptions; + + /** + * Upstream Request Retry + * + * Configure the retry for upstream requests. + */ + upstreamRetry?: UpstreamRetryPluginOptions; } interface DisableIntrospectionOptions { diff --git a/packages/runtime/tests/upstream-retry.test.ts b/packages/runtime/tests/upstream-retry.test.ts new file mode 100644 index 00000000..a11e5ef6 --- /dev/null +++ b/packages/runtime/tests/upstream-retry.test.ts @@ -0,0 +1,200 @@ +import { + createGatewayRuntime, + useCustomFetch, +} from '@graphql-hive/gateway-runtime'; +import { getUnifiedGraphGracefully } from '@graphql-mesh/fusion-composition'; +import { MeshFetch } from '@graphql-mesh/types'; +import { Response } from '@whatwg-node/fetch'; +import { createGraphQLError, createSchema, createYoga } from 'graphql-yoga'; +import { describe, expect, it } from 'vitest'; + +describe('Upstream Retry', () => { + it('respects \`maxRetries\`', async () => { + let attempts = 0; + let maxRetries = 2; + const failUntil = 3; + const upstreamSchema = createSchema({ + typeDefs: /* GraphQL */ ` + type Query { + hello: String + } + `, + resolvers: { + Query: { + hello: () => 'world', + }, + }, + }); + await using upstreamServer = createYoga({ + schema: upstreamSchema, + plugins: [ + { + onRequest({ endResponse }) { + if (attempts <= failUntil) { + attempts++; + endResponse( + Response.json( + { + errors: [ + createGraphQLError(`Error in attempt ${attempts}`), + ], + }, + { + status: 500, + }, + ), + ); + return; + } + }, + }, + ], + }); + await using gateway = createGatewayRuntime({ + supergraph: getUnifiedGraphGracefully([ + { + name: 'upstream', + schema: upstreamSchema, + url: 'http://localhost:4001/graphql', + }, + ]), + plugins() { + return [useCustomFetch(upstreamServer.fetch as MeshFetch)]; + }, + upstreamRetry: () => ({ + maxRetries, + retryDelay: 100, + }), + }); + const res = await gateway.fetch('http://localhost:4000/graphql', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + query: /* GraphQL */ ` + query { + hello + } + `, + }), + }); + const resJson = await res.json(); + expect(resJson).toEqual({ + data: { + hello: null, + }, + errors: [ + { + message: 'Error in attempt 3', + extensions: { + code: 'DOWNSTREAM_SERVICE_ERROR', + }, + path: ['hello'], + }, + ], + }); + attempts = 0; + maxRetries = 10; + const res2 = await gateway.fetch('http://localhost:4000/graphql', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + query: /* GraphQL */ ` + query { + hello + } + `, + }), + }); + const resJson2 = await res2.json(); + expect(resJson2).toEqual({ + data: { + hello: 'world', + }, + }); + }); + it('respects \`Retry-After\` header', async () => { + let diffBetweenRetries: number | undefined; + let lastAttempt: number | undefined; + const upstreamSchema = createSchema({ + typeDefs: /* GraphQL */ ` + type Query { + hello: String + } + `, + resolvers: { + Query: { + hello: () => 'world', + }, + }, + }); + await using gateway = createGatewayRuntime({ + supergraph: getUnifiedGraphGracefully([ + { + name: 'upstream', + schema: upstreamSchema, + url: 'http://localhost:4001/graphql', + }, + ]), + plugins() { + return [ + useCustomFetch(() => { + if (lastAttempt) { + diffBetweenRetries = Date.now() - lastAttempt; + } + lastAttempt = Date.now(); + return Response.json( + { + errors: [createGraphQLError(`Rate limited`)], + }, + { + status: 429, + headers: { + 'Retry-After': '1', + }, + }, + ); + }), + ]; + }, + upstreamRetry: () => ({ + maxRetries: 1, + // To make sure it is more than retry-after + retryDelay: 10_000, + }), + }); + const res = await gateway.fetch('http://localhost:4000/graphql', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + query: /* GraphQL */ ` + query { + hello + } + `, + }), + }); + const resJson = await res.json(); + expect(resJson).toEqual({ + data: { + hello: null, + }, + errors: [ + { + message: 'Rate limited', + extensions: { + code: 'DOWNSTREAM_SERVICE_ERROR', + }, + path: ['hello'], + }, + ], + }); + expect(diffBetweenRetries).toBeDefined(); + expect(Math.floor(diffBetweenRetries! / 1000)).toBeGreaterThanOrEqual(1); + }); +}); diff --git a/packages/runtime/tests/upstream-timeout.test.ts b/packages/runtime/tests/upstream-timeout.test.ts new file mode 100644 index 00000000..f6a7768f --- /dev/null +++ b/packages/runtime/tests/upstream-timeout.test.ts @@ -0,0 +1,78 @@ +import { + createGatewayRuntime, + useCustomFetch, +} from '@graphql-hive/gateway-runtime'; +import { getUnifiedGraphGracefully } from '@graphql-mesh/fusion-composition'; +import { MeshFetch } from '@graphql-mesh/types'; +import { createDeferred } from '@graphql-tools/utils'; +import { createSchema, createYoga } from 'graphql-yoga'; +import { describe, expect, it } from 'vitest'; + +describe('Upstream Timeout', () => { + it('times out based on factory function', async () => { + const greetingsDeferred = createDeferred(); + const upstreamSchema = createSchema({ + typeDefs: /* GraphQL */ ` + type Query { + hello: String + } + `, + resolvers: { + Query: { + hello: () => greetingsDeferred.promise, + }, + }, + }); + await using upstreamServer = createYoga({ + schema: upstreamSchema, + }); + await using gateway = createGatewayRuntime({ + supergraph: getUnifiedGraphGracefully([ + { + name: 'upstream', + schema: upstreamSchema, + url: 'http://localhost:4001/graphql', + }, + ]), + plugins() { + return [useCustomFetch(upstreamServer.fetch as MeshFetch)]; + }, + upstreamTimeout({ subgraphName }) { + if (subgraphName === 'upstream') { + return 1000; + } + throw new Error('Unexpected subgraph'); + }, + }); + setTimeout(() => { + greetingsDeferred.resolve('Hello, World!'); + }, 1500); + const res = await gateway.fetch('http://localhost:4000/graphql', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + query: /* GraphQL */ ` + query { + hello + } + `, + }), + }); + const resJson = await res.json(); + expect(resJson).toEqual({ + data: { + hello: null, + }, + errors: [ + expect.objectContaining({ + message: expect.stringMatching( + /(The operation was aborted due to timeout|The operation timed out.)/, + ), + path: ['hello'], + }), + ], + }); + }); +}); diff --git a/packages/transports/common/package.json b/packages/transports/common/package.json index 8dfcec17..965e869b 100644 --- a/packages/transports/common/package.json +++ b/packages/transports/common/package.json @@ -43,8 +43,9 @@ }, "dependencies": { "@envelop/core": "^5.0.1", + "@graphql-hive/gateway-abort-signal-any": "workspace:^", "@graphql-mesh/types": "^0.103.6", - "@graphql-tools/delegate": "workspace:^", + "@graphql-tools/executor": "^1.3.8", "@graphql-tools/utils": "^10.6.2", "tslib": "^2.8.1" }, diff --git a/packages/transports/common/src/index.ts b/packages/transports/common/src/index.ts index ac8840c9..0195f76a 100644 --- a/packages/transports/common/src/index.ts +++ b/packages/transports/common/src/index.ts @@ -4,10 +4,11 @@ import { print, stripIgnoredCharacters, type DocumentNode } from 'graphql'; export type * from './types'; export * from './ObjMap'; -export { createDefaultExecutor } from '@graphql-tools/delegate'; +export { executorFromSchema as createDefaultExecutor } from '@graphql-tools/executor'; export { getDocumentString } from '@envelop/core'; export const defaultPrintFn = memoize1(function defaultPrintFn( document: DocumentNode, ) { return stripIgnoredCharacters(getDocumentString(document, print)); }); +export { abortSignalAny } from '@graphql-hive/gateway-abort-signal-any'; diff --git a/packages/transports/http-callback/src/index.ts b/packages/transports/http-callback/src/index.ts index 9aec7ae6..dc01feb4 100644 --- a/packages/transports/http-callback/src/index.ts +++ b/packages/transports/http-callback/src/index.ts @@ -1,6 +1,7 @@ import { process } from '@graphql-mesh/cross-helpers'; import { getInterpolatedHeadersFactory } from '@graphql-mesh/string-interpolation'; import { + abortSignalAny, defaultPrintFn, type DisposableExecutor, type Transport, @@ -151,6 +152,10 @@ export default { `HTTP Callback Transport: \`location\` is missing in the transport entry!`, ); } + let signal = execReq.signal || execReq.info?.signal; + if (signal) { + signal = abortSignalAny([reqAbortCtrl.signal, signal]); + } const subFetchCall$ = mapMaybePromise( fetch( transportEntry.location, @@ -167,7 +172,7 @@ export default { Accept: 'application/json;callbackSpec=1.0; charset=utf-8', }, body: fetchBody, - signal: reqAbortCtrl.signal, + signal, }, execReq.context, execReq.info, @@ -216,6 +221,13 @@ export default { ); execReq.context?.waitUntil?.(subFetchCall$); return new Repeater((push, stop) => { + signal?.addEventListener( + 'abort', + () => { + stop(signal.reason); + }, + { once: true }, + ); pushFn = push; stopSubscription = stop; stopFnSet.add(stop); diff --git a/tsconfig.json b/tsconfig.json index 1069045e..8de4cba2 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -55,7 +55,10 @@ "./packages/stitching-directives/src/index.ts" ], "@graphql-tools/wrap": ["./packages/wrap/src/index.ts"], - "@graphql-tools/executor-*": ["./packages/executors/*/src/index.ts"] + "@graphql-tools/executor-*": ["./packages/executors/*/src/index.ts"], + "@graphql-hive/gateway-abort-signal-any": [ + "./packages/abort-signal-any/src/index.ts" + ] } }, "include": [ diff --git a/yarn.lock b/yarn.lock index 0a53e9f4..a3958898 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3039,6 +3039,15 @@ __metadata: languageName: node linkType: hard +"@graphql-hive/gateway-abort-signal-any@workspace:^, @graphql-hive/gateway-abort-signal-any@workspace:packages/abort-signal-any": + version: 0.0.0-use.local + resolution: "@graphql-hive/gateway-abort-signal-any@workspace:packages/abort-signal-any" + dependencies: + pkgroll: "npm:2.5.1" + tslib: "npm:^2.8.1" + languageName: unknown + linkType: soft + "@graphql-hive/gateway-runtime@workspace:*, @graphql-hive/gateway-runtime@workspace:^, @graphql-hive/gateway-runtime@workspace:packages/runtime": version: 0.0.0-use.local resolution: "@graphql-hive/gateway-runtime@workspace:packages/runtime" @@ -3047,6 +3056,7 @@ __metadata: "@envelop/disable-introspection": "npm:^6.0.0" "@envelop/generic-auth": "npm:^8.0.0" "@graphql-hive/core": "npm:^0.8.1" + "@graphql-hive/gateway-abort-signal-any": "workspace:^" "@graphql-mesh/cross-helpers": "npm:^0.4.9" "@graphql-mesh/fusion-composition": "npm:^0.7.0" "@graphql-mesh/fusion-runtime": "workspace:^" @@ -3626,10 +3636,11 @@ __metadata: resolution: "@graphql-mesh/transport-common@workspace:packages/transports/common" dependencies: "@envelop/core": "npm:^5.0.1" + "@graphql-hive/gateway-abort-signal-any": "workspace:^" "@graphql-mesh/cross-helpers": "npm:^0.4.9" "@graphql-mesh/types": "npm:^0.103.6" "@graphql-mesh/utils": "npm:^0.103.6" - "@graphql-tools/delegate": "workspace:^" + "@graphql-tools/executor": "npm:^1.3.8" "@graphql-tools/utils": "npm:^10.6.2" graphql: "npm:^16.9.0" pkgroll: "npm:2.5.1" @@ -3834,7 +3845,7 @@ __metadata: resolution: "@graphql-tools/delegate@workspace:packages/delegate" dependencies: "@graphql-tools/batch-execute": "workspace:^" - "@graphql-tools/executor": "npm:^1.3.6" + "@graphql-tools/executor": "npm:^1.3.8" "@graphql-tools/schema": "npm:^10.0.11" "@graphql-tools/utils": "npm:^10.6.2" "@repeaterjs/repeater": "npm:^3.0.6" @@ -3871,6 +3882,7 @@ __metadata: resolution: "@graphql-tools/executor-http@workspace:packages/executors/http" dependencies: "@apollo/server": "npm:^4.11.2" + "@graphql-hive/gateway-abort-signal-any": "workspace:^" "@graphql-tools/utils": "npm:^10.6.2" "@repeaterjs/repeater": "npm:^3.0.4" "@types/extract-files": "npm:8.1.3" @@ -3888,7 +3900,7 @@ __metadata: languageName: unknown linkType: soft -"@graphql-tools/executor@npm:^1.3.2, @graphql-tools/executor@npm:^1.3.5, @graphql-tools/executor@npm:^1.3.6, @graphql-tools/executor@npm:^1.3.7": +"@graphql-tools/executor@npm:^1.3.2, @graphql-tools/executor@npm:^1.3.5, @graphql-tools/executor@npm:^1.3.6, @graphql-tools/executor@npm:^1.3.7, @graphql-tools/executor@npm:^1.3.8": version: 1.3.8 resolution: "@graphql-tools/executor@npm:1.3.8" dependencies: @@ -4312,19 +4324,7 @@ __metadata: languageName: node linkType: hard -"@graphql-yoga/subscription@npm:^5.0.1": - version: 5.0.1 - resolution: "@graphql-yoga/subscription@npm:5.0.1" - dependencies: - "@graphql-yoga/typed-event-target": "npm:^3.0.0" - "@repeaterjs/repeater": "npm:^3.0.4" - "@whatwg-node/events": "npm:^0.1.0" - tslib: "npm:^2.5.2" - checksum: 10c0/fccbdb1497edc085437d83127dbe6fde399b30e0277d5d6cdfc4aad16d733b00fe8b65e3d0eda3ba50250123545e59a7e684b9de5f9050721933ff1bf9950a58 - languageName: node - linkType: hard - -"@graphql-yoga/subscription@npm:^5.0.2": +"@graphql-yoga/subscription@npm:^5.0.1, @graphql-yoga/subscription@npm:^5.0.2": version: 5.0.2 resolution: "@graphql-yoga/subscription@npm:5.0.2" dependencies: @@ -6781,15 +6781,15 @@ __metadata: linkType: hard "@whatwg-node/node-fetch@npm:^0.7.1": - version: 0.7.4 - resolution: "@whatwg-node/node-fetch@npm:0.7.4" + version: 0.7.5 + resolution: "@whatwg-node/node-fetch@npm:0.7.5" dependencies: "@kamilkisiela/fast-url-parser": "npm:^1.1.4" "@whatwg-node/disposablestack": "npm:^0.0.5" busboy: "npm:^1.6.0" fast-querystring: "npm:^1.1.1" tslib: "npm:^2.6.3" - checksum: 10c0/2ac3fa2bf2534eab6bf33fa511f67e760eb3d190e04970fe7a92bf2f1dfee0ad4ee337ec87d73b51e7b7bdaed8a29b7a19e445e964493693c5bfa06c91ed6479 + checksum: 10c0/d0693ff047f0e51e94a36e77d3d39f323f74a4205c2116add44e2fc0991e4d6044bde55a867d1cf78d7c4a406d73d75df5975876a32831c0cc5829811172335e languageName: node linkType: hard