Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORL-1464] Improve Subscription Memory Usage #3241

Merged
merged 2 commits into from
Oct 28, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 58 additions & 27 deletions src/core/server/graph/resolvers/Subscription/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { GraphQLResolveInfo } from "graphql";
import { withFilter } from "graphql-subscriptions";

import GraphContext from "../../context";
import { SUBSCRIPTION_CHANNELS, SubscriptionPayload } from "./types";

type ResolverFn<TParent, TArgs, TContext> = (
parent: TParent,
args: TArgs,
context: TContext,
info: GraphQLResolveInfo
) => AsyncIterable<any>;

type FilterFn<TParent, TArgs, TContext> = (
parent: TParent,
args: TArgs,
Expand All @@ -23,13 +29,51 @@ interface SubscriptionResolver<TParent, TArgs, TResult> {
resolve: Resolver<TParent, TArgs, TParent>;
}

export function createTenantAsyncIterator<TParent, TArgs, TResult>(
/**
* withFilter applies a filter to a async iterator.
*
* This duplicates the functionality of the withFilter function provided by the
* `graphql-subscriptions` package without the memory leak as it uses native
* async iterators instead.
*
* Solution provided by @brettjashford.
*
* https://github.com/apollographql/graphql-subscriptions/pull/209#issuecomment-713906710
*
* @param asyncIteratorFn the async iterator to use that's provided by the transport
* @param filterFn the filter to apply for each iteration to check to see if we should sent it
*/
function withFilter<TParent, TArgs>(
asyncIteratorFn: ResolverFn<TParent, TArgs, GraphContext>,
filterFn: FilterFn<TParent, TArgs, GraphContext>
) {
return async function* (
source: TParent,
args: TArgs,
ctx: GraphContext,
info: GraphQLResolveInfo
) {
const asyncIterator = asyncIteratorFn(source, args, ctx, info);
for await (const payload of asyncIterator) {
if (await filterFn(payload, args, ctx, info)) {
yield payload;
}
}
};
}

function createTenantAsyncIterator<TParent, TArgs, TResult>(
channel: SUBSCRIPTION_CHANNELS
): Resolver<TParent, TArgs, AsyncIterator<TResult>> {
): Resolver<TParent, TArgs, AsyncIterable<TResult>> {
return (source, args, ctx) =>
ctx.pubsub.asyncIterator<TResult>(
// This is already technically returning an AsyncIterable, the Typescript
// types are in fact wrong:
//
// https://github.com/davidyaha/graphql-redis-subscriptions/pull/255
//
(ctx.pubsub.asyncIterator<TResult>(
createSubscriptionChannelName(ctx.tenant.id, channel)
);
) as unknown) as AsyncIterable<TResult>;
}

export function createSubscriptionChannelName(
Expand All @@ -40,7 +84,7 @@ export function createSubscriptionChannelName(
}

/**
* defaultFilterFn will perform filtering operations on the subscription
* clientIDFilterFn will perform filtering operations on the subscription
* responses to ensure that mutations issued by one user is not sent back as a
* subscription to the same requesting User, as they already implement the
* update via the mutation response.
Expand All @@ -53,7 +97,7 @@ export function createSubscriptionChannelName(
* need to determine eligibility to send the subscription back or
* not.
*/
export function defaultFilterFn<TParent extends SubscriptionPayload, TArgs>(
export function clientIDFilterFn<TParent extends SubscriptionPayload, TArgs>(
source: TParent,
args: TArgs,
ctx: GraphContext
Expand All @@ -65,28 +109,15 @@ export function defaultFilterFn<TParent extends SubscriptionPayload, TArgs>(
return true;
}

/**
* Ensure that even when we're provided with a domain specific filtering
* function we respect the subscription id that is sent back with the request to
* prevent double responses.
*/
export function createFilterFn<TParent, TArgs>(
filter?: FilterFn<TParent, TArgs, GraphContext>
function composeFilters<TParent, TArgs>(
...filters: Array<FilterFn<TParent, TArgs, GraphContext>>
): FilterFn<TParent, TArgs, GraphContext> {
return filter
? // Combine the filters, preferring the defaultFilterFn first.
(source, args, ctx, info) => {
if (!defaultFilterFn(source, args, ctx)) {
return false;
}

return filter(source, args, ctx, info);
}
: defaultFilterFn;
return (source, args, ctx, info) =>
filters.every((filter) => filter(source, args, ctx, info));
}

export interface CreateIteratorInput<TParent, TArgs, TResult> {
filter?: FilterFn<TParent, TArgs, GraphContext>;
filter: FilterFn<TParent, TArgs, GraphContext>;
}

export function createIterator<
Expand All @@ -95,12 +126,12 @@ export function createIterator<
TResult
>(
channel: SUBSCRIPTION_CHANNELS,
{ filter }: CreateIteratorInput<TParent, TArgs, TResult> = {}
{ filter }: CreateIteratorInput<TParent, TArgs, TResult>
): SubscriptionResolver<TParent, TArgs, TResult> {
return {
subscribe: withFilter(
createTenantAsyncIterator(channel),
createFilterFn(filter)
composeFilters(clientIDFilterFn, filter)
),
resolve: (payload) => payload,
};
Expand Down