From 67f915e05c029ce9a595c05cf2f42de232ff2d37 Mon Sep 17 00:00:00 2001 From: angrykoala Date: Fri, 22 Nov 2024 11:03:45 +0000 Subject: [PATCH] Filter cdc events skipping nodes not present in the schema --- .changeset/tasty-onions-laugh.md | 5 +++ .../Neo4jGraphQLSubscriptionsCDCEngine.ts | 14 ++++++- .../src/classes/subscription/cdc/cdc-api.ts | 40 +++++++++++-------- 3 files changed, 42 insertions(+), 17 deletions(-) create mode 100644 .changeset/tasty-onions-laugh.md diff --git a/.changeset/tasty-onions-laugh.md b/.changeset/tasty-onions-laugh.md new file mode 100644 index 0000000000..d2e8681eff --- /dev/null +++ b/.changeset/tasty-onions-laugh.md @@ -0,0 +1,5 @@ +--- +"@neo4j/graphql": patch +--- + +CDC subscription optimization. Only node events with labels present in the GraphQL schema will be queried. This will reduce the number of subscription events queried by skipping events to nodes that cannot be subscribed through GraphQL diff --git a/packages/graphql/src/classes/subscription/Neo4jGraphQLSubscriptionsCDCEngine.ts b/packages/graphql/src/classes/subscription/Neo4jGraphQLSubscriptionsCDCEngine.ts index 2bc90b85c5..19804cd3a3 100644 --- a/packages/graphql/src/classes/subscription/Neo4jGraphQLSubscriptionsCDCEngine.ts +++ b/packages/graphql/src/classes/subscription/Neo4jGraphQLSubscriptionsCDCEngine.ts @@ -20,6 +20,7 @@ import { EventEmitter } from "events"; import type { Driver, QueryConfig } from "neo4j-driver"; import { Memoize } from "typescript-memoize"; +import type { Neo4jGraphQLSchemaModel } from "../../schema-model/Neo4jGraphQLSchemaModel"; import type { Neo4jGraphQLSubscriptionsEngine, SubscriptionEngineContext, SubscriptionsEvent } from "../../types"; import { CDCApi } from "./cdc/cdc-api"; import { CDCEventParser } from "./cdc/cdc-event-parser"; @@ -33,6 +34,8 @@ export class Neo4jGraphQLSubscriptionsCDCEngine implements Neo4jGraphQLSubscript private timer: ReturnType | undefined; private closed = false; + private subscribeToLabels: string[] | undefined; + constructor({ driver, pollTime = 1000, @@ -63,6 +66,9 @@ export class Neo4jGraphQLSubscriptionsCDCEngine implements Neo4jGraphQLSubscript public async init({ schemaModel }: SubscriptionEngineContext): Promise { await this.cdcApi.updateCursor(); this._parser = new CDCEventParser(schemaModel); + this.subscribeToLabels = this.getLabelsToFilter(schemaModel); + + schemaModel.concreteEntities.map((e) => Array.from(e.labels)); this.triggerPoll(); } @@ -91,7 +97,7 @@ export class Neo4jGraphQLSubscriptionsCDCEngine implements Neo4jGraphQLSubscript } private async pollEvents(): Promise { - const cdcEvents = await this.cdcApi.queryEvents(); + const cdcEvents = await this.cdcApi.queryEvents(this.subscribeToLabels); for (const cdcEvent of cdcEvents) { const parsedEvent = this.parser.parseCDCEvent(cdcEvent); if (parsedEvent) { @@ -99,4 +105,10 @@ export class Neo4jGraphQLSubscriptionsCDCEngine implements Neo4jGraphQLSubscript } } } + + private getLabelsToFilter(schemaModel: Neo4jGraphQLSchemaModel): string[] { + const uniqueLabels = new Set(schemaModel.concreteEntities.flatMap((e) => Array.from(e.labels))); + + return Array.from(uniqueLabels); + } } diff --git a/packages/graphql/src/classes/subscription/cdc/cdc-api.ts b/packages/graphql/src/classes/subscription/cdc/cdc-api.ts index 47622dde90..c9c8ad1601 100644 --- a/packages/graphql/src/classes/subscription/cdc/cdc-api.ts +++ b/packages/graphql/src/classes/subscription/cdc/cdc-api.ts @@ -19,7 +19,6 @@ import Cypher from "@neo4j/cypher-builder"; import type { Driver, QueryConfig } from "neo4j-driver"; -import { filterTruthy } from "../../../utils/utils"; import type { CDCQueryResponse } from "./cdc-types"; export class CDCApi { @@ -33,13 +32,15 @@ export class CDCApi { } /** Queries events since last call to queryEvents */ - public async queryEvents(): Promise { + public async queryEvents(labels?: string[]): Promise { if (!this.cursor) { this.cursor = await this.fetchCurrentChangeId(); } const cursorLiteral = new Cypher.Literal(this.cursor); - const queryProcedure = CDCProcedures.query(cursorLiteral); + + const selectors = this.createQuerySelectors(labels); + const queryProcedure = Cypher.db.cdc.query(cursorLiteral, selectors); const events = await this.runProcedure(queryProcedure); this.updateChangeIdWithLastEvent(events); @@ -51,7 +52,7 @@ export class CDCApi { } private async fetchCurrentChangeId(): Promise { - const currentProcedure = CDCProcedures.current(); + const currentProcedure = Cypher.db.cdc.current(); const result = await this.runProcedure<{ id: string }>(currentProcedure); @@ -69,6 +70,25 @@ export class CDCApi { } } + private createQuerySelectors(labels: string[] | undefined): Cypher.Map[] { + if (labels) { + return labels.map( + (l) => + new Cypher.Map({ + select: new Cypher.Literal("n"), + labels: new Cypher.Literal([l]), + }) + ); + } else { + // Filters nodes + return [ + new Cypher.Map({ + select: new Cypher.Literal("n"), + }), + ]; + } + } + private async runProcedure(procedure: Cypher.Clause): Promise { const { cypher, params } = procedure.build(); @@ -78,15 +98,3 @@ export class CDCApi { }) as T[]; } } - -/** Wrapper of Cypher Builder for CDC */ -class CDCProcedures { - static current(): Cypher.Procedure { - return new Cypher.Procedure<"id">("cdc.current"); - } - - static query(from: Cypher.Expr, selectors?: Cypher.Expr): Cypher.Procedure { - const procedureParams = filterTruthy([from, selectors]); - return new Cypher.Procedure<"id" | "txId" | "seq" | "metadata" | "event">("cdc.query", procedureParams); - } -}