Skip to content

Commit

Permalink
Filter cdc events skipping nodes not present in the schema
Browse files Browse the repository at this point in the history
  • Loading branch information
angrykoala committed Nov 22, 2024
1 parent 376eced commit 67f915e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
5 changes: 5 additions & 0 deletions .changeset/tasty-onions-laugh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@neo4j/graphql": patch
---

CDC subscription optimization. Only node events with labels present in the GraphQL schema will be queried. This will reduce the number of subscription events queried by skipping events to nodes that cannot be subscribed through GraphQL
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import { EventEmitter } from "events";
import type { Driver, QueryConfig } from "neo4j-driver";
import { Memoize } from "typescript-memoize";
import type { Neo4jGraphQLSchemaModel } from "../../schema-model/Neo4jGraphQLSchemaModel";
import type { Neo4jGraphQLSubscriptionsEngine, SubscriptionEngineContext, SubscriptionsEvent } from "../../types";
import { CDCApi } from "./cdc/cdc-api";
import { CDCEventParser } from "./cdc/cdc-event-parser";
Expand All @@ -33,6 +34,8 @@ export class Neo4jGraphQLSubscriptionsCDCEngine implements Neo4jGraphQLSubscript
private timer: ReturnType<typeof setTimeout> | undefined;
private closed = false;

private subscribeToLabels: string[] | undefined;

constructor({
driver,
pollTime = 1000,
Expand Down Expand Up @@ -63,6 +66,9 @@ export class Neo4jGraphQLSubscriptionsCDCEngine implements Neo4jGraphQLSubscript
public async init({ schemaModel }: SubscriptionEngineContext): Promise<void> {
await this.cdcApi.updateCursor();
this._parser = new CDCEventParser(schemaModel);
this.subscribeToLabels = this.getLabelsToFilter(schemaModel);

schemaModel.concreteEntities.map((e) => Array.from(e.labels));
this.triggerPoll();
}

Expand Down Expand Up @@ -91,12 +97,18 @@ export class Neo4jGraphQLSubscriptionsCDCEngine implements Neo4jGraphQLSubscript
}

private async pollEvents(): Promise<void> {
const cdcEvents = await this.cdcApi.queryEvents();
const cdcEvents = await this.cdcApi.queryEvents(this.subscribeToLabels);
for (const cdcEvent of cdcEvents) {
const parsedEvent = this.parser.parseCDCEvent(cdcEvent);
if (parsedEvent) {
this.events.emit(parsedEvent.event, parsedEvent);
}
}
}

private getLabelsToFilter(schemaModel: Neo4jGraphQLSchemaModel): string[] {
const uniqueLabels = new Set(schemaModel.concreteEntities.flatMap((e) => Array.from(e.labels)));

return Array.from(uniqueLabels);
}
}
40 changes: 24 additions & 16 deletions packages/graphql/src/classes/subscription/cdc/cdc-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import Cypher from "@neo4j/cypher-builder";
import type { Driver, QueryConfig } from "neo4j-driver";
import { filterTruthy } from "../../../utils/utils";
import type { CDCQueryResponse } from "./cdc-types";

export class CDCApi {
Expand All @@ -33,13 +32,15 @@ export class CDCApi {
}

/** Queries events since last call to queryEvents */
public async queryEvents(): Promise<CDCQueryResponse[]> {
public async queryEvents(labels?: string[]): Promise<CDCQueryResponse[]> {
if (!this.cursor) {
this.cursor = await this.fetchCurrentChangeId();
}

const cursorLiteral = new Cypher.Literal(this.cursor);
const queryProcedure = CDCProcedures.query(cursorLiteral);

const selectors = this.createQuerySelectors(labels);
const queryProcedure = Cypher.db.cdc.query(cursorLiteral, selectors);

const events = await this.runProcedure<CDCQueryResponse>(queryProcedure);
this.updateChangeIdWithLastEvent(events);
Expand All @@ -51,7 +52,7 @@ export class CDCApi {
}

private async fetchCurrentChangeId(): Promise<string> {
const currentProcedure = CDCProcedures.current();
const currentProcedure = Cypher.db.cdc.current();

const result = await this.runProcedure<{ id: string }>(currentProcedure);

Expand All @@ -69,6 +70,25 @@ export class CDCApi {
}
}

private createQuerySelectors(labels: string[] | undefined): Cypher.Map[] {
if (labels) {
return labels.map(
(l) =>
new Cypher.Map({
select: new Cypher.Literal("n"),
labels: new Cypher.Literal([l]),
})
);
} else {
// Filters nodes
return [
new Cypher.Map({
select: new Cypher.Literal("n"),
}),
];
}
}

private async runProcedure<T>(procedure: Cypher.Clause): Promise<T[]> {
const { cypher, params } = procedure.build();

Expand All @@ -78,15 +98,3 @@ export class CDCApi {
}) as T[];
}
}

/** Wrapper of Cypher Builder for CDC */
class CDCProcedures {
static current(): Cypher.Procedure {
return new Cypher.Procedure<"id">("cdc.current");
}

static query(from: Cypher.Expr, selectors?: Cypher.Expr): Cypher.Procedure {
const procedureParams = filterTruthy([from, selectors]);
return new Cypher.Procedure<"id" | "txId" | "seq" | "metadata" | "event">("cdc.query", procedureParams);
}
}

0 comments on commit 67f915e

Please sign in to comment.