Skip to content

Commit

Permalink
Store pipeline definitions in DB (#1452)
Browse files Browse the repository at this point in the history
This backs the `PipelineDefinitionDB` on to the Postgres DB. It's quite
crude and is also missing user-management features, but it works for
being able to start pipelines outside of tests.

I've avoided doing user-management code since that overlaps with
#1451
  • Loading branch information
robknight authored Jan 31, 2024
1 parent 5f14c59 commit 1183024
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 15 deletions.
4 changes: 1 addition & 3 deletions apps/passport-server/src/application.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import process from "node:process";
import * as path from "path";
import urljoin from "url-join";
import { MockPipelineAtomDB } from "../test/generic-issuance/MockPipelineAtomDB";
import { MockPipelineDefinitionDB } from "../test/generic-issuance/MockPipelineDefinitionDB";
import { getDevconnectPretixAPI } from "./apis/devconnect/devconnectPretixAPI";
import { IEmailAPI, sendgridSendEmail } from "./apis/emailAPI";
import { getHoneycombAPI } from "./apis/honeycombAPI";
Expand Down Expand Up @@ -52,8 +51,7 @@ export async function startApplication(
publicResourcesDir: path.join(process.cwd(), "public"),
gitCommitHash: await getCommitHash(),
// TODO: remove these once we have settled on a db schema for these
pipelineAtomDB: new MockPipelineAtomDB(),
pipelineDefinitionDB: new MockPipelineDefinitionDB()
pipelineAtomDB: new MockPipelineAtomDB()
};

const apis = await getOverridenApis(context, apiOverrides);
Expand Down
21 changes: 21 additions & 0 deletions apps/passport-server/src/database/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,24 @@ export interface RateLimitBucket {
// last_take is a bigint in Postgres, which node-postgres turns into a string
last_take: string;
}

export interface GenericIssuancePipelineRow {
id: string;
owner_user_id: string;
editor_user_ids: string[];
pipeline_type: string;
// Config corresponds to the `options` property of PretixPipelineDefinition/
// LemonadePipelineDefinition. There is no generic or base type for this, but
// it's represented as JSON in the DB.
// Using "any" here is not great, but using "unknown" means that we would
// need some way to parse the config from the JSON, e.g. a Zod schema. This
// might be worth coming back to once the configuration format is stable.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
config: any;
}

export interface GenericIssuanceUserRow {
id: string;
email: string;
is_admin: boolean;
}
154 changes: 153 additions & 1 deletion apps/passport-server/src/database/queries/pipelineDefinitionDB.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { PipelineDefinition } from "../../services/generic-issuance/pipelines/types";
import _ from "lodash";
import { Pool, PoolClient } from "postgres-pool";
import {
PipelineDefinition,
PipelineType
} from "../../services/generic-issuance/pipelines/types";
import { GenericIssuancePipelineRow } from "../models";
import { sqlQuery, sqlTransaction } from "../sqlQuery";

/**
* This doesn't follow the general convention we've had so far of queries
Expand All @@ -14,3 +21,148 @@ export interface IPipelineDefinitionDB {
setDefinition(definition: PipelineDefinition): Promise<void>;
setDefinitions(definitions: PipelineDefinition[]): Promise<void>;
}

/**
* Implements the above interface with the Postgres DB as back-end.
* In reality we are probably going to want more fine-grained APIs - rather
* than updating the entire definition, we are going to want to do things like
* "change owner", "add editor" or "remove editor". The approach below is
* simply for the MVP.
*/
export class PipelineDefinitionDB implements IPipelineDefinitionDB {
private db: Pool;

public constructor(db: Pool) {
this.db = db;
}

public async loadPipelineDefinitions(): Promise<PipelineDefinition[]> {
const result = await sqlQuery(
this.db,
`
SELECT p.*, ARRAY_AGG(e.editor_id) AS editor_user_ids
FROM generic_issuance_pipelines p
LEFT JOIN generic_issuance_pipeline_editors e
ON p.id = e.pipeline_id
GROUP BY p.id`
);

return result.rows.map((row: GenericIssuancePipelineRow) => ({
id: row.id,
ownerUserId: row.owner_user_id,
editorUserIds: row.editor_user_ids.filter(
(editorId: unknown) => typeof editorId === "string"
),
type: row.pipeline_type as PipelineType,
options: row.config
}));
}

public async clearAllDefinitions(): Promise<void> {
await sqlQuery(this.db, "DELETE FROM generic_issuance_pipeline_editors");
await sqlQuery(this.db, "DELETE FROM generic_issuance_pipelines");
}

public async getDefinition(
definitionID: string
): Promise<PipelineDefinition | undefined> {
const result = await sqlQuery(
this.db,
`
SELECT p.*, ARRAY_AGG(e.editor_id) AS editor_user_ids
FROM generic_issuance_pipelines p
LEFT JOIN generic_issuance_pipeline_editors e
ON p.id = e.pipeline_id
WHERE p.id = $1
GROUP BY p.id`,
[definitionID]
);

if (result.rowCount === 0) {
return undefined;
} else {
const row: GenericIssuancePipelineRow = result.rows[0];
return {
id: row.id,
ownerUserId: row.owner_user_id,
editorUserIds: row.editor_user_ids.filter(
(editorId: unknown) => typeof editorId === "string"
),
type: row.pipeline_type as PipelineType,
options: row.config
};
}
}

/**
* Sets a pipeline definition. This is used to either insert or update a
* definition. If inserting, the caller is responsible for generating a UUID
* as the pipeline ID.
*/
public async setDefinition(definition: PipelineDefinition): Promise<void> {
await sqlTransaction(
this.db,
"Insert or update pipeline definition",
async (client: PoolClient) => {
const pipeline: GenericIssuancePipelineRow = (
await client.query(
`
INSERT INTO generic_issuance_pipelines (id, owner_user_id, pipeline_type, config) VALUES($1, $2, $3, $4)
ON CONFLICT(id) DO UPDATE
SET (owner_user_id, pipeline_type, config) = ($2, $3, $4)
RETURNING *
`,
[
definition.id,
definition.ownerUserId,
definition.type,
JSON.stringify(definition.options)
]
)
).rows[0];

pipeline.editor_user_ids = (
await client.query(
`SELECT editor_id FROM generic_issuance_pipeline_editors WHERE pipeline_id = $1`,
[definition.id]
)
).rows.map((row) => row.editor_id);

if (!_.isEqual(pipeline.editor_user_ids, definition.editorUserIds)) {
const editorsToRemove = _.difference(
pipeline.editor_user_ids,
definition.editorUserIds
);
const editorsToAdd = _.difference(
definition.editorUserIds,
pipeline.editor_user_ids
);

if (editorsToRemove.length > 0) {
await client.query(
`DELETE FROM generic_issuance_pipeline_editors WHERE editor_id = ANY($1)`,
[[editorsToRemove]]
);
}

if (editorsToAdd.length > 0) {
for (const editorId of editorsToAdd) {
await client.query(
"INSERT INTO generic_issuance_pipeline_editors (pipeline_id, editor_id) VALUES($1, $2)",
[pipeline.id, editorId]
);
}
}
}
}
);
}

public async setDefinitions(
definitions: PipelineDefinition[]
): Promise<void> {
for (const definition of definitions) {
await this.setDefinition(definition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import stytch, { Client, Session } from "stytch";
import { ILemonadeAPI } from "../../apis/lemonade/lemonadeAPI";
import { IGenericPretixAPI } from "../../apis/pretix/genericPretixAPI";
import { IPipelineAtomDB } from "../../database/queries/pipelineAtomDB";
import { IPipelineDefinitionDB } from "../../database/queries/pipelineDefinitionDB";
import {
IPipelineDefinitionDB,
PipelineDefinitionDB
} from "../../database/queries/pipelineDefinitionDB";
import { PCDHTTPError } from "../../routing/pcdHttpError";
import { ApplicationContext } from "../../types";
import { logger } from "../../util/logger";
Expand Down Expand Up @@ -132,7 +135,6 @@ export class GenericIssuanceService {

public constructor(
context: ApplicationContext,
definitionDB: IPipelineDefinitionDB,
atomDB: IPipelineAtomDB,
lemonadeAPI: ILemonadeAPI,
stytchClient: Client,
Expand All @@ -141,7 +143,7 @@ export class GenericIssuanceService {
eddsaPrivateKey: string,
zupassPublicKey: EdDSAPublicKey
) {
this.definitionDB = definitionDB;
this.definitionDB = new PipelineDefinitionDB(context.dbPool);
this.atomDB = atomDB;
this.context = context;
this.lemonadeAPI = lemonadeAPI;
Expand Down Expand Up @@ -369,7 +371,6 @@ export async function startGenericIssuanceService(

const issuanceService = new GenericIssuanceService(
context,
context.pipelineDefinitionDB,
context.pipelineAtomDB,
lemonadeAPI,
stytchClient,
Expand Down
2 changes: 0 additions & 2 deletions apps/passport-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { IGenericPretixAPI } from "./apis/pretix/genericPretixAPI";
import { IZuconnectTripshaAPI } from "./apis/zuconnect/zuconnectTripshaAPI";
import { IZuzaluPretixAPI } from "./apis/zuzaluPretixAPI";
import { IPipelineAtomDB } from "./database/queries/pipelineAtomDB";
import { IPipelineDefinitionDB } from "./database/queries/pipelineDefinitionDB";
import {
DevconnectPretixAPIFactory,
DevconnectPretixSyncService
Expand Down Expand Up @@ -41,7 +40,6 @@ export interface ApplicationContext {
gitCommitHash: string;

/// WIP. remove once we have real database APIs for these
pipelineDefinitionDB: IPipelineDefinitionDB;
pipelineAtomDB: IPipelineAtomDB;
}

Expand Down
72 changes: 67 additions & 5 deletions apps/passport-server/test/genericIssuance.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import * as path from "path";
import { ILemonadeAPI } from "../src/apis/lemonade/lemonadeAPI";
import { getI18nString } from "../src/apis/pretix/genericPretixAPI";
import { stopApplication } from "../src/application";
import { PipelineDefinitionDB } from "../src/database/queries/pipelineDefinitionDB";
import { sqlQuery } from "../src/database/sqlQuery";
import { GenericIssuanceService } from "../src/services/generic-issuance/genericIssuanceService";
import {
LemonadePipeline,
Expand Down Expand Up @@ -217,8 +219,10 @@ describe("generic issuance service tests", function () {

const lemonadeAPI: ILemonadeAPI = new MockLemonadeAPI(mockLemonadeData);

const pipelineOwnerUUID = randomUUID();

const lemonadeDefinition: LemonadePipelineDefinition = {
ownerUserId: randomUUID(),
ownerUserId: pipelineOwnerUUID,
id: randomUUID(),
editorUserIds: [],
options: {
Expand Down Expand Up @@ -257,7 +261,7 @@ describe("generic issuance service tests", function () {
const checkerPretixEmail = pretixOrganizer.EMAIL_1;

const pretixDefinition: PretixPipelineDefinition = {
ownerUserId: randomUUID(),
ownerUserId: pipelineOwnerUUID,
id: randomUUID(),
editorUserIds: [],
options: {
Expand Down Expand Up @@ -297,6 +301,13 @@ describe("generic issuance service tests", function () {
lemonadeAPI
});

// TODO: remove this once we have user management
await sqlQuery(
application.context.dbPool,
"INSERT INTO generic_issuance_users VALUES($1, $2, $3)",
[pipelineOwnerUUID, "[email protected]", true]
);

const orgUrls = mockPretixData.get().organizersByOrgUrl.keys();
mockPretixServer = getGenericMockPretixAPIServer(orgUrls, mockPretixData);
mockPretixServer.listen({ onUnhandledRequest: "bypass" });
Expand All @@ -305,10 +316,11 @@ describe("generic issuance service tests", function () {
URL_ROOT = application.expressContext.localEndpoint;
giService = application.services.genericIssuanceService;
await giService?.stop();
await application.context.pipelineDefinitionDB.clearAllDefinitions();
await application.context.pipelineDefinitionDB.setDefinitions(
pipelineDefinitions
const pipelineDefinitionDB = new PipelineDefinitionDB(
application.context.dbPool
);
await pipelineDefinitionDB.clearAllDefinitions();
await pipelineDefinitionDB.setDefinitions(pipelineDefinitions);
await giService?.start();
});

Expand Down Expand Up @@ -462,6 +474,56 @@ describe("generic issuance service tests", function () {
expect(thirdCheckinResult.success).to.eq(false);
});

it("test definition DB", async function () {
const definitionDB = new PipelineDefinitionDB(application.context.dbPool);
await definitionDB.clearAllDefinitions();

{
const definitions = await definitionDB.loadPipelineDefinitions();
expect(definitions).to.be.empty;
}

{
await definitionDB.setDefinitions(pipelineDefinitions);
const definitions = await definitionDB.loadPipelineDefinitions();
expect(definitions.length).to.eq(2);

const pretixDefinition = definitions.find(
(d) => d.type === PipelineType.Pretix
) as PretixPipelineDefinition;
const newKey = "TEST_KEY";
pretixDefinition.options = {
...pretixDefinition?.options,
pretixAPIKey: newKey
};
await definitionDB.setDefinition(pretixDefinition);
const updatedPretixDefinition = (await definitionDB.getDefinition(
pretixDefinition.id
)) as PretixPipelineDefinition;
expect(updatedPretixDefinition).to.exist;
expect(
(updatedPretixDefinition as PretixPipelineDefinition).options
.pretixAPIKey
).to.eq(newKey);

updatedPretixDefinition.editorUserIds.push(pipelineOwnerUUID);
await definitionDB.setDefinition(updatedPretixDefinition);
const newEditorDefinition = (await definitionDB.getDefinition(
updatedPretixDefinition.id
)) as PretixPipelineDefinition;
expect(newEditorDefinition).to.exist;
expect(newEditorDefinition.editorUserIds).to.contain(pipelineOwnerUUID);

newEditorDefinition.editorUserIds = [];
await definitionDB.setDefinition(newEditorDefinition);
const emptyEditorsDefinition = (await definitionDB.getDefinition(
updatedPretixDefinition.id
)) as PretixPipelineDefinition;
expect(emptyEditorsDefinition).to.exist;
expect(emptyEditorsDefinition.editorUserIds).to.be.empty;
}
});

this.afterAll(async () => {
await stopApplication(application);
mockPretixServer.close();
Expand Down

0 comments on commit 1183024

Please sign in to comment.