Skip to content

Commit

Permalink
feat: Retry and Timeout configuration for upstream requests (#322)
Browse files Browse the repository at this point in the history
Co-authored-by: Valentin Cocaud <[email protected]>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent 3ba3609 commit 23b8987
Show file tree
Hide file tree
Showing 29 changed files with 967 additions and 83 deletions.
7 changes: 7 additions & 0 deletions .changeset/@graphql-hive_gateway-runtime-322-dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@graphql-hive/gateway-runtime': patch
---

dependencies updates:

- Added dependency [`@graphql-hive/gateway-abort-signal-any@workspace:^` ↗︎](https://www.npmjs.com/package/@graphql-hive/gateway-abort-signal-any/v/workspace:^) (to `dependencies`)
9 changes: 9 additions & 0 deletions .changeset/@graphql-mesh_transport-common-322-dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@graphql-mesh/transport-common': patch
---

dependencies updates:

- Added dependency [`@graphql-hive/gateway-abort-signal-any@workspace:^` ↗︎](https://www.npmjs.com/package/@graphql-hive/gateway-abort-signal-any/v/workspace:^) (to `dependencies`)
- Added dependency [`@graphql-tools/executor@^1.3.8` ↗︎](https://www.npmjs.com/package/@graphql-tools/executor/v/1.3.8) (to `dependencies`)
- Removed dependency [`@graphql-tools/delegate@workspace:^` ↗︎](https://www.npmjs.com/package/@graphql-tools/delegate/v/workspace:^) (from `dependencies`)
7 changes: 7 additions & 0 deletions .changeset/@graphql-tools_delegate-322-dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@graphql-tools/delegate': patch
---

dependencies updates:

- Updated dependency [`@graphql-tools/executor@^1.3.8` ↗︎](https://www.npmjs.com/package/@graphql-tools/executor/v/1.3.8) (from `^1.3.6`, in `dependencies`)
7 changes: 7 additions & 0 deletions .changeset/@graphql-tools_executor-http-322-dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@graphql-tools/executor-http': patch
---

dependencies updates:

- Added dependency [`@graphql-hive/gateway-abort-signal-any@workspace:^` ↗︎](https://www.npmjs.com/package/@graphql-hive/gateway-abort-signal-any/v/workspace:^) (to `dependencies`)
5 changes: 5 additions & 0 deletions .changeset/late-socks-repeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@graphql-hive/gateway-abort-signal-any': patch
---

New package
50 changes: 50 additions & 0 deletions .changeset/witty-candles-whisper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
'@graphql-hive/gateway-runtime': minor
'@graphql-hive/gateway': minor
---

New Retry and Timeout plugins;

- Retry plugin: Retry a request if it fails

It respects the `Retry-After` HTTP header, [See more about this HTTP](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After)

```ts
export const gatewayConfig = defineConfig({
upstreamRetry: {
// The maximum number of retries to attempt.
maxRetries: 3, // required
// The delay between retries in milliseconds.
retryDelay: 1000, // default
/**
* A function that determines whether a response should be retried.
* If the upstream returns `Retry-After` header, the request will be retried.
*/
shouldRetry: ({ response }) => response?.status >= 500 || response?.status === 429
}
// or you can configure it by subgraph name
upstreamRetry({ subgraphName }) {
if (subgraphName === 'my-rate-limited-subgraph') {
return {
maxRetries: 3,
}
}
return { maxRetries: 10 }
}
})
```

- Timeout plugin: Timeout a request if it takes too long

```ts
export const gatewayConfig = defineConfig({
// The maximum time in milliseconds to wait for a response from the upstream.
upstreamTimeout: 1000, // required
// or you can configure it by subgraph name
upstreamTimeout({ subgraphName }) {
if (subgraphName === 'my-slow-subgraph') {
return 1000;
}
}
})
```
46 changes: 46 additions & 0 deletions packages/abort-signal-any/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"name": "@graphql-hive/gateway-abort-signal-any",
"version": "0.0.0",
"type": "module",
"repository": {
"type": "git",
"url": "git+https://github.com/graphql-hive/gateway.git",
"directory": "packages/abort-signal-any"
},
"license": "MIT",
"engines": {
"node": ">=18.0.0"
},
"main": "./dist/index.js",
"exports": {
".": {
"require": {
"types": "./dist/index.d.cts",
"default": "./dist/index.cjs"
},
"import": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"./package.json": "./package.json"
},
"types": "./dist/index.d.ts",
"files": [
"dist"
],
"scripts": {
"build": "pkgroll --clean-dist",
"prepack": "yarn build"
},
"dependencies": {
"tslib": "^2.8.1"
},
"devDependencies": {
"pkgroll": "2.5.1"
},
"publishConfig": {
"access": "public"
},
"sideEffects": false
}
66 changes: 66 additions & 0 deletions packages/abort-signal-any/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
export type AbortSignalFromAny = AbortSignal & {
signals: Set<AbortSignal>;
addSignals(signals: Iterable<AbortSignal>): void;
};

export function isAbortSignalFromAny(
signal?: AbortSignal | null,
): signal is AbortSignalFromAny {
return signal != null && 'signals' in signal && 'addSignals' in signal;
}

export function abortSignalAny(givenSignals: Iterable<AbortSignal>) {
const signals = new Set<AbortSignal>();
let singleSignal: AbortSignal | undefined;
for (const signal of givenSignals) {
if (isAbortSignalFromAny(signal)) {
for (const childSignal of signal.signals) {
singleSignal = childSignal;
signals.add(childSignal);
}
} else {
singleSignal = signal;
signals.add(signal);
}
}
if (signals.size < 2) {
return singleSignal;
}
if (signals.size === 0) {
return undefined;
}
const ctrl = new AbortController();
function onAbort(this: AbortSignal, ev: Event) {
const signal = (ev.target as AbortSignal) || this;
ctrl.abort(signal.reason);
for (const signal of signals) {
signal.removeEventListener('abort', onAbort);
}
}
for (const signal of signals) {
signal.addEventListener('abort', onAbort, { once: true });
}
Object.defineProperties(ctrl.signal, {
signals: { value: signals },
addSignals: {
value(newSignals: Iterable<AbortSignal>) {
for (const signal of newSignals) {
if (isAbortSignalFromAny(signal)) {
for (const childSignal of signal.signals) {
if (!signals.has(childSignal)) {
signals.add(childSignal);
childSignal.addEventListener('abort', onAbort, { once: true });
}
}
} else {
if (!signals.has(signal)) {
signals.add(signal);
signal.addEventListener('abort', onAbort, { once: true });
}
}
}
},
},
});
return ctrl.signal as AbortSignalFromAny;
}
2 changes: 1 addition & 1 deletion packages/delegate/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
},
"dependencies": {
"@graphql-tools/batch-execute": "workspace:^",
"@graphql-tools/executor": "^1.3.6",
"@graphql-tools/executor": "^1.3.8",
"@graphql-tools/schema": "^10.0.11",
"@graphql-tools/utils": "^10.6.2",
"@repeaterjs/repeater": "^3.0.6",
Expand Down
21 changes: 3 additions & 18 deletions packages/delegate/src/delegateToSchema.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { getBatchingExecutor } from '@graphql-tools/batch-execute';
import { normalizedExecutor } from '@graphql-tools/executor';
import { executorFromSchema } from '@graphql-tools/executor';
import {
ExecutionRequest,
ExecutionResult,
Executor,
getDefinedRootType,
Expand All @@ -11,7 +10,6 @@ import {
mapMaybePromise,
Maybe,
MaybeAsyncIterable,
memoize1,
} from '@graphql-tools/utils';
import { Repeater } from '@repeaterjs/repeater';
import { dset } from 'dset/merge';
Expand Down Expand Up @@ -295,7 +293,7 @@ function getExecutor<TContext extends Record<string, any>>(
const { subschemaConfig, targetSchema, context } = delegationContext;

let executor: Executor =
subschemaConfig?.executor || createDefaultExecutor(targetSchema);
subschemaConfig?.executor || executorFromSchema(targetSchema);

if (subschemaConfig?.batch) {
const batchingOptions = subschemaConfig?.batchingOptions;
Expand All @@ -310,17 +308,4 @@ function getExecutor<TContext extends Record<string, any>>(
return executor;
}

export const createDefaultExecutor = memoize1(function createDefaultExecutor(
schema: GraphQLSchema,
): Executor {
return function defaultExecutor(request: ExecutionRequest) {
return normalizedExecutor({
schema,
document: request.document,
rootValue: request.rootValue,
contextValue: request.context,
variableValues: request.variables,
operationName: request.operationName,
});
};
});
export { executorFromSchema as createDefaultExecutor };
9 changes: 9 additions & 0 deletions packages/executors/graphql-ws/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ export function buildGraphQLWSExecutor(
operationName,
extensions,
operationType = getOperationASTFromRequest(executionRequest).operation,
info,
signal = info?.signal,
} = executionRequest;
// additional connection params can be supplied through the "connectionParams" field in extensions.
// TODO: connection params only from the FIRST operation in lazy mode will be used (detect connectionParams changes and reconnect, too implicit?)
Expand All @@ -98,6 +100,13 @@ export function buildGraphQLWSExecutor(
operationName,
extensions,
});
signal?.addEventListener(
'abort',
() => {
iterableIterator.return?.();
},
{ once: true },
);
if (operationType === 'subscription') {
return iterableIterator;
}
Expand Down
1 change: 1 addition & 0 deletions packages/executors/http/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"graphql": "^14.0.0 || ^15.0.0 || ^16.0.0 || ^17.0.0"
},
"dependencies": {
"@graphql-hive/gateway-abort-signal-any": "workspace:^",
"@graphql-tools/utils": "^10.6.2",
"@repeaterjs/repeater": "^3.0.4",
"@whatwg-node/disposablestack": "^0.0.5",
Expand Down
4 changes: 2 additions & 2 deletions packages/executors/http/src/handleEventStreamResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ export function isReadableStream(value: any): value is ReadableStream {
}

export function handleEventStreamResponse(
signal: AbortSignal,
response: Response,
signal?: AbortSignal,
) {
// node-fetch returns body as a promise so we need to resolve it
const body = response.body;
Expand All @@ -35,7 +35,7 @@ export function handleEventStreamResponse(

let currChunk = '';
async function pump() {
if (signal.aborted) {
if (signal?.aborted) {
await push(createResultForAbort(signal));
return stop();
}
Expand Down
25 changes: 16 additions & 9 deletions packages/executors/http/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { abortSignalAny } from '@graphql-hive/gateway-abort-signal-any';
import {
createGraphQLError,
DisposableAsyncExecutor,
Expand Down Expand Up @@ -229,14 +230,20 @@ export function buildHTTPExecutor(
request.extensions = restExtensions;
}

let signal = sharedSignal;
const signals = [sharedSignal];
const signalFromRequest = request.signal || request.info?.signal;
if (signalFromRequest) {
if (signalFromRequest.aborted) {
return createResultForAbort(signalFromRequest.reason);
}
signals.push(signalFromRequest);
}
if (options?.timeout) {
signal = AbortSignal.any([
sharedSignal,
AbortSignal.timeout(options.timeout),
]);
signals.push(AbortSignal.timeout(options.timeout));
}

const signal = abortSignalAny(signals);

const upstreamErrorExtensions: UpstreamErrorExtensions = {
request: {
method,
Expand Down Expand Up @@ -365,7 +372,7 @@ export function buildHTTPExecutor(

const contentType = fetchResult.headers.get('content-type');
if (contentType?.includes('text/event-stream')) {
return handleEventStreamResponse(signal, fetchResult);
return handleEventStreamResponse(fetchResult, signal);
} else if (contentType?.includes('multipart/mixed')) {
return handleMultipartMixedResponse(fetchResult);
}
Expand Down Expand Up @@ -543,7 +550,7 @@ function coerceFetchError(
endpoint,
upstreamErrorExtensions,
}: {
signal: AbortSignal;
signal?: AbortSignal;
endpoint: string;
upstreamErrorExtensions: UpstreamErrorExtensions;
},
Expand All @@ -559,8 +566,8 @@ function coerceFetchError(
extensions: upstreamErrorExtensions,
originalError: e,
});
} else if (e.name === 'AbortError' && signal.reason) {
return createGraphQLErrorForAbort(signal.reason, {
} else if (e.name === 'AbortError' && signal?.reason) {
return createGraphQLErrorForAbort(signal?.reason, {
extensions: upstreamErrorExtensions,
});
} else if (e.message) {
Expand Down
Loading

0 comments on commit 23b8987

Please sign in to comment.