Skip to content

Commit

Permalink
Add example for Retry/Timeout and some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan committed Dec 23, 2024
1 parent eb1ad4a commit c976e22
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 87 deletions.
8 changes: 8 additions & 0 deletions .changeset/calm-eels-hope.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@graphql-mesh/transport-common': patch
'@graphql-tools/executor-http': patch
'@graphql-mesh/fusion-runtime': patch
'@graphql-hive/gateway-runtime': patch
---

Fix Retry / Timeout combination
19 changes: 19 additions & 0 deletions e2e/retry-timeout/gateway.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { defineConfig } from '@graphql-hive/gateway';

let i = 0;
export const gatewayConfig = defineConfig({
upstreamRetry: {
maxRetries: 4,
},
upstreamTimeout: 300,
plugins(ctx) {
return [
{
onFetch({ options }) {
i++;
ctx.logger.info(`Fetching with ${options.body} for the ${i} time`);
},
},
];
},
});
11 changes: 11 additions & 0 deletions e2e/retry-timeout/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"name": "@e2e/retry-timeout",
"private": true,
"dependencies": {
"@apollo/subgraph": "^2.9.3",
"@graphql-hive/gateway": "workspace:*",
"graphql": "16.10.0",
"graphql-yoga": "^5.10.6",
"tslib": "^2.8.1"
}
}
46 changes: 46 additions & 0 deletions e2e/retry-timeout/retry-timeout.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { createTenv } from '@internal/e2e';
import { expect, it } from 'vitest';

const { service, gateway } = createTenv(__dirname);

it('Retry & Timeout', async () => {
const flakeyService = await service('flakey');
const gw = await gateway({
supergraph: {
with: 'apollo',
services: [flakeyService],
},
});

const res = await gw.execute({
query: /* GraphQL */ `
query {
product(id: "1") {
id
name
}
}
`,
});

expect(res).toEqual({
data: {
product: {
id: '1',
name: 'Product 1',
},
},
});

const logs = gw.getStd('both');
// The first request will fail, and the gateway will retry 2 more times
expect(logs).toContain(
'Fetching with {"query":"{__typename product(id:\\"1\\"){id name}}"} for the 1 time',
);
expect(logs).toContain(
'Fetching with {"query":"{__typename product(id:\\"1\\"){id name}}"} for the 2 time',
);
expect(logs).toContain(
'Fetching with {"query":"{__typename product(id:\\"1\\"){id name}}"} for the 3 time',
);
});
97 changes: 97 additions & 0 deletions e2e/retry-timeout/services/flakey.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { createServer } from 'http';
import { buildSubgraphSchema } from '@apollo/subgraph';
import { GraphQLResolverMap } from '@apollo/subgraph/dist/schema-helper';
import { Opts } from '@internal/testing';
import { parse } from 'graphql';
import {
createGraphQLError,
createYoga,
YogaInitialContext,
} from 'graphql-yoga';

const opts = Opts(process.argv);

let i = 0;
let lastAttempt: number | undefined;

const servicePort = opts.getServicePort('flakey');

const resolvers = {
Query: {
product: (
_: unknown,
{ id }: { id: string },
context: YogaInitialContext,
) => {
i++;
console.log(`${i} attempt`);
if (lastAttempt && Date.now() - lastAttempt < 1000) {
const secondsToWait = Math.ceil(
(1000 - (Date.now() - lastAttempt)) / 1000,
);
return createGraphQLError('You are too early, still wait...', {
extensions: {
http: {
status: 429,
headers: {
'retry-after': secondsToWait.toString(),
},
},
},
});
}
lastAttempt = Date.now();
// First attempt will fail with timeout
if (i === 1) {
let reject: (reason: any) => void;
const promise = new Promise<void>((_resolve, _reject) => {
reject = _reject;
});
setTimeout(() => {
reject('Timeout');
}, 1000);
return promise;
}
// Second attempt will fail with 500
if (i === 2) {
return createGraphQLError('Flakiness...', {
extensions: {
http: {
status: 503,
headers: {
'retry-after': '1',
},
},
},
});
}
// Third attempt will return
return {
id,
name: 'Product ' + id,
};
},
},
} as GraphQLResolverMap<unknown>;

createServer(
createYoga({
schema: buildSubgraphSchema({
typeDefs: parse(/* GraphQL */ `
type Query {
product(id: ID!): Product
}
type Product {
id: ID!
name: String!
}
`),
resolvers,
}),
}),
).listen(servicePort, () => {
console.log(
`🚀 Flakey service ready at http://localhost:${servicePort}/graphql`,
);
});
14 changes: 8 additions & 6 deletions packages/executors/http/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ export function buildHTTPExecutor(
request: {
method,
},
response: {},
};

const query = printFn(request.document);
Expand Down Expand Up @@ -351,6 +350,7 @@ export function buildHTTPExecutor(
}
})
.then((fetchResult: Response): any => {
upstreamErrorExtensions.response ||= {};
upstreamErrorExtensions.response.status = fetchResult.status;
upstreamErrorExtensions.response.statusText = fetchResult.statusText;
Object.defineProperty(upstreamErrorExtensions.response, 'headers', {
Expand Down Expand Up @@ -381,6 +381,7 @@ export function buildHTTPExecutor(
})
.then((result) => {
if (typeof result === 'string') {
upstreamErrorExtensions.response ||= {};
upstreamErrorExtensions.response.body = result;
if (result) {
try {
Expand Down Expand Up @@ -566,10 +567,11 @@ function coerceFetchError(
extensions: upstreamErrorExtensions,
originalError: e,
});
} else if (e.name === 'AbortError' && signal?.reason) {
return createGraphQLErrorForAbort(signal?.reason, {
extensions: upstreamErrorExtensions,
});
} else if (e.name === 'AbortError') {
return createGraphQLErrorForAbort(
signal?.reason || e,
upstreamErrorExtensions,
);
} else if (e.message) {
return createGraphQLError(e.message, {
extensions: upstreamErrorExtensions,
Expand All @@ -591,7 +593,7 @@ interface UpstreamErrorExtensions {
method: string;
body?: unknown;
};
response: {
response?: {
status?: number;
statusText?: string;
headers?: Record<string, string>;
Expand Down
11 changes: 11 additions & 0 deletions packages/executors/http/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createGraphQLError, mapMaybePromise } from '@graphql-tools/utils';
import { crypto, TextEncoder } from '@whatwg-node/fetch';
import { GraphQLError } from 'graphql';

export function createAbortErrorReason() {
return new Error('Executor was disposed.');
Expand All @@ -9,8 +10,18 @@ export function createGraphQLErrorForAbort(
reason: any,
extensions?: Record<string, any>,
) {
if (reason instanceof GraphQLError) {
return reason;
}
if (reason?.name === 'TimeoutError') {
return createGraphQLError(reason.message, {
extensions,
originalError: reason,
});
}
return createGraphQLError('The operation was aborted. reason: ' + reason, {
extensions,
originalError: reason,
});
}

Expand Down
19 changes: 11 additions & 8 deletions packages/fusion-runtime/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,30 +248,33 @@ declare module 'graphql' {
* with `onSubgraphExecuteHooks` to hook into the execution phase of subgraphs
*/
export function wrapExecutorWithHooks({
executor,
executor: baseExecutor,
onSubgraphExecuteHooks,
subgraphName,
transportEntryMap,
getSubgraphSchema,
transportContext,
}: WrapExecuteWithHooksOptions): Executor {
return function executorWithHooks(executionRequest: ExecutionRequest) {
executionRequest.info = executionRequest.info || ({} as GraphQLResolveInfo);
executionRequest.info.executionRequest = executionRequest;
return function executorWithHooks(baseExecutionRequest: ExecutionRequest) {
baseExecutionRequest.info =
baseExecutionRequest.info || ({} as GraphQLResolveInfo);
baseExecutionRequest.info.executionRequest = baseExecutionRequest;
const requestId =
executionRequest.context?.request &&
requestIdByRequest.get(executionRequest.context.request);
baseExecutionRequest.context?.request &&
requestIdByRequest.get(baseExecutionRequest.context.request);
let execReqLogger = transportContext?.logger;
if (execReqLogger) {
if (requestId) {
execReqLogger = execReqLogger.child(requestId);
}
loggerForExecutionRequest.set(executionRequest, execReqLogger);
loggerForExecutionRequest.set(baseExecutionRequest, execReqLogger);
}
execReqLogger = execReqLogger?.child?.(subgraphName);
if (onSubgraphExecuteHooks.length === 0) {
return executor(executionRequest);
return baseExecutor(baseExecutionRequest);
}
let executor = baseExecutor;
let executionRequest = baseExecutionRequest;
const onSubgraphExecuteDoneHooks: OnSubgraphExecuteDoneHook[] = [];
return mapMaybePromise(
iterateAsync(
Expand Down
21 changes: 12 additions & 9 deletions packages/runtime/src/plugins/useUpstreamRetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ export function useUpstreamRetry<TContext extends Record<string, any>>(
}
// If there are errors that are not original GraphQL errors, retry
if (
!isAsyncIterable(executionResult) &&
executionResult.errors?.length &&
!executionResult.errors.some(isOriginalGraphQLError)
!executionResult ||
(!isAsyncIterable(executionResult) &&
executionResult.errors?.length &&
executionResult.errors.some((e) => !isOriginalGraphQLError(e)))
) {
return true;
}
Expand All @@ -93,25 +94,27 @@ export function useUpstreamRetry<TContext extends Record<string, any>>(
} = optsForReq;
if (maxRetries > 0) {
setExecutor(function (executionRequest: ExecutionRequest) {
let retries = maxRetries + 1;
let attemptsLeft = maxRetries + 1;
let executionResult: MaybeAsyncIterable<ExecutionResult>;
let currRetryDelay = retryDelay;
function retry(): MaybePromise<
MaybeAsyncIterable<ExecutionResult>
> {
retries--;
try {
if (retries < 0) {
if (attemptsLeft <= 0) {
return executionResult;
}
const requestTime = Date.now();
attemptsLeft--;
return mapMaybePromise(
executor(executionRequest),
(currRes) => {
executionResult = currRes;
let retryAfterSecondsFromHeader: number | undefined;
const response =
executionRequestResponseMap.get(executionRequest);
// Remove the response from the map after used so we don't see it again
executionRequestResponseMap.delete(executionRequest);
const retryAfterHeader =
response?.headers.get('Retry-After');
if (retryAfterHeader) {
Expand Down Expand Up @@ -142,17 +145,17 @@ export function useUpstreamRetry<TContext extends Record<string, any>>(
timeouts.add(timeout);
});
}
return currRes;
return executionResult;
},
(e) => {
if (retries < 0) {
if (attemptsLeft <= 0) {
throw e;
}
return retry();
},
);
} catch (e) {
if (retries < 0) {
if (attemptsLeft <= 0) {
throw e;
}
return retry();
Expand Down
Loading

0 comments on commit c976e22

Please sign in to comment.