Skip to content

Commit

Permalink
trigger runs for new envs
Browse files Browse the repository at this point in the history
  • Loading branch information
jsbroks committed Sep 11, 2024
1 parent 5608a87 commit 5b96b5f
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 116 deletions.
1 change: 1 addition & 0 deletions apps/event-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
},
"dependencies": {
"@ctrlplane/db": "workspace:*",
"@ctrlplane/job-dispatch": "workspace:*",
"@ctrlplane/logger": "workspace:*",
"@ctrlplane/validators": "workspace:*",
"@google-cloud/container": "^5.16.0",
Expand Down
4 changes: 2 additions & 2 deletions apps/event-worker/src/target-scan/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import {
targetProviderGoogle,
workspace,
} from "@ctrlplane/db/schema";
import { upsertTargets } from "@ctrlplane/job-dispatch";
import { logger } from "@ctrlplane/logger";
import { Channel } from "@ctrlplane/validators/events";

import { redis } from "../redis.js";
import { getGkeTargets } from "./gke.js";
import { upsertTargets } from "./upsert.js";

const targetScanQueue = new Queue(Channel.TargetScan, { connection: redis });
const removeTargetJob = (job: Job) =>
Expand Down Expand Up @@ -55,7 +55,7 @@ export const createTargetScanWorker = () =>
tp.target_provider_google,
);

await upsertTargets(db, tp.workspace.id, gkeTargets);
await upsertTargets(db, gkeTargets);
}
},
{
Expand Down
28 changes: 0 additions & 28 deletions apps/event-worker/src/target-scan/upsert.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import type { NextRequest } from "next/server";
import { NextResponse } from "next/server";
import _ from "lodash";
import { z } from "zod";

import { can } from "@ctrlplane/auth/utils";
import { buildConflictUpdateColumns, eq, takeFirstOrNull } from "@ctrlplane/db";
import { eq, takeFirstOrNull } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import {
createTarget,
target,
targetProvider,
workspace,
} from "@ctrlplane/db/schema";
import { createTarget, targetProvider, workspace } from "@ctrlplane/db/schema";
import { upsertTargets } from "@ctrlplane/job-dispatch";
import { Permission } from "@ctrlplane/validators/auth";

import { getUser } from "~/app/api/v1/auth";
Expand All @@ -21,10 +18,26 @@ const bodySchema = z.object({
),
});

const canAccessTargetProvider = async (userId: string, providerId: string) =>
can()
.user(userId)
.perform(Permission.TargetUpdate)
.on({ type: "targetProvider", id: providerId });

export const PATCH = async (
req: NextRequest,
{ params }: { params: { providerId: string } },
) => {
const user = await getUser(req);
if (!user) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}

const canAccess = await canAccessTargetProvider(user.id, params.providerId);
if (!canAccess) {
return NextResponse.json({ error: "Permission denied" }, { status: 403 });
}

const query = await db
.select()
.from(targetProvider)
Expand All @@ -33,39 +46,18 @@ export const PATCH = async (
.then(takeFirstOrNull);

const provider = query?.target_provider;
if (provider == null)
if (!provider) {
return NextResponse.json({ error: "Provider not found" }, { status: 404 });
}

const user = await getUser(req);
if (user == null)
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });

const canAccess = await can()
.user(user.id)
.perform(Permission.TargetUpdate)
.on({ type: "targetProvider", id: params.providerId });

if (!canAccess)
return NextResponse.json({ error: "Permission denied" }, { status: 403 });

const response = await req.json();
const body = await bodySchema.parseAsync(response);
const body = await bodySchema.parseAsync(await req.json());
const targetsToInsert = body.targets.map((t) => ({
...t,
providerId: provider.id,
workspaceId: provider.workspaceId,
}));

const results = await db
.insert(target)
.values(
body.targets.map((t) => ({
...t,
providerId: provider.id,
workspaceId: provider.workspaceId,
lockedAt: null,
})),
)
.onConflictDoUpdate({
target: [target.identifier, target.workspaceId],
set: buildConflictUpdateColumns(target, ["labels"]),
})
.returning();
const targets = await upsertTargets(db, targetsToInsert);

return NextResponse.json({ targets: results });
return NextResponse.json({ targets });
};
52 changes: 48 additions & 4 deletions packages/api/src/router/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
arrayContains,
eq,
isNull,
not,
takeFirst,
takeFirstOrNull,
} from "@ctrlplane/db";
Expand Down Expand Up @@ -35,6 +36,7 @@ import {
createJobConfigs,
createJobExecutionApprovals,
dispatchJobConfigs,
dispatchJobsForNewTargets,
isPassingAllPolicies,
isPassingReleaseSequencingCancelPolicy,
} from "@ctrlplane/job-dispatch";
Expand Down Expand Up @@ -563,14 +565,56 @@ export const environmentRouter = createTRPCRouter({
.on({ type: "environment", id: input.id }),
})
.input(z.object({ id: z.string().uuid(), data: updateEnvironment }))
.mutation(({ ctx, input }) =>
ctx.db
.mutation(async ({ ctx, input }) => {
const { targetFilter } = input.data;
const isUpdatingTargetFilter = targetFilter != null;
if (isUpdatingTargetFilter) {
const oldEnv = await ctx.db
.select()
.from(environment)
.innerJoin(system, eq(system.id, environment.systemId))
.where(eq(environment.id, input.id))
.then(takeFirst);

const hasTargetFiltersChanged = !_.isEqual(
oldEnv.environment.targetFilter,
targetFilter,
);

if (hasTargetFiltersChanged) {
const newTargets = await ctx.db
.select({ id: target.id })
.from(target)
.where(
and(
eq(target.workspaceId, oldEnv.system.workspaceId),
arrayContains(target.labels, targetFilter),
not(
arrayContains(target.labels, oldEnv.environment.targetFilter),
),
),
);

if (newTargets.length > 0) {
await dispatchJobsForNewTargets(
ctx.db,
newTargets.map((t) => t.id),
input.id,
);
console.log(
`Found ${newTargets.length} new targets for environment ${input.id}`,
);
}
}
}

return ctx.db
.update(environment)
.set(input.data)
.where(eq(environment.id, input.id))
.returning()
.then(takeFirst),
),
.then(takeFirst);
}),

delete: protectedProcedure
.meta({
Expand Down
4 changes: 3 additions & 1 deletion packages/db/src/schema/target.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { InferSelectModel } from "drizzle-orm";
import type { InferInsertModel, InferSelectModel } from "drizzle-orm";
import {
json,
jsonb,
Expand Down Expand Up @@ -55,6 +55,8 @@ export const createTarget = createInsertSchema(target, {
labels: z.record(z.string()),
}).omit({ id: true });

export type InsertTarget = InferInsertModel<typeof target>;

export const updateTarget = createTarget.partial();

export const targetSchema = pgTable(
Expand Down
2 changes: 2 additions & 0 deletions packages/job-dispatch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export * from "./policy-create.js";
export * from "./release-checker.js";
export * from "./release-sequencing.js";
export * from "./gradual-rollout.js";
export * from "./new-target.js";
export * from "./target.js";
15 changes: 5 additions & 10 deletions packages/job-dispatch/src/job-dispatch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Tx } from "@ctrlplane/db";
import type { JobConfig } from "@ctrlplane/db/schema";
import type { JobConfig, JobExecution } from "@ctrlplane/db/schema";
import _ from "lodash";

import type { JobExecutionReason } from "./job-execution.js";
Expand All @@ -25,8 +25,8 @@ class DispatchBuilder {
this._then = [];
}

filter(func: DispatchFilterFunc) {
this._filters.push(func);
filter(...func: DispatchFilterFunc[]) {
this._filters.push(...func);
return this;
}

Expand All @@ -45,7 +45,7 @@ class DispatchBuilder {
return this;
}

async dispatch() {
async dispatch(): Promise<JobExecution[]> {
let t = this._jobConfigs;
for (const func of this._filters) t = await func(this.db, t);

Expand All @@ -55,12 +55,7 @@ class DispatchBuilder {
for (const func of this._then) await func(this.db, t);

await dispatchJobExecutionsQueue.addBulk(
wfs.map((wf) => ({
name: wf.id,
data: {
jobExecutionId: wf.id,
},
})),
wfs.map((wf) => ({ name: wf.id, data: { jobExecutionId: wf.id } })),
);

return wfs;
Expand Down
67 changes: 67 additions & 0 deletions packages/job-dispatch/src/new-target.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { Tx } from "@ctrlplane/db";

import { and, arrayContains, eq, inArray } from "@ctrlplane/db";
import { environment, target } from "@ctrlplane/db/schema";

import { createJobConfigs } from "./job-config.js";
import { dispatchJobConfigs } from "./job-dispatch.js";
import { isPassingLockingPolicy } from "./lock-checker.js";
import { isPassingApprovalPolicy } from "./policy-checker.js";
import { isPassingReleaseDependencyPolicy } from "./release-checker.js";

/**
* Asserts that all given targets are part of the specified environment.
*
* This is a runtime assertion to make it easier to debug if this ever happens.
* It ensures that the targets being processed are actually associated with the
* given environment.
*/
async function assertTargetsInEnvironment(
db: Tx,
targetIds: string[],
envId: string,
): Promise<void> {
const targetsInEnv = await db
.select({ id: target.id })
.from(environment)
.innerJoin(target, arrayContains(target.labels, environment.targetFilter))
.where(and(eq(environment.id, envId), inArray(target.id, targetIds)));
const targetsInEnvIds = new Set(targetsInEnv.map((t) => t.id));
const allTargetsInEnv = targetIds.every((id) => targetsInEnvIds.has(id));
if (allTargetsInEnv) return;
throw new Error("Some targets are not part of the specified environment");
}

/**
* Dispatches jobs for new targets added to an environment.
*/
export async function dispatchJobsForNewTargets(
db: Tx,
newTargetIds: string[],
envId: string,
): Promise<void> {
await assertTargetsInEnvironment(db, newTargetIds, envId);

const jobConfigs = await createJobConfigs(db, "new_target")
.targets(newTargetIds)
.environments([envId])
.insert();

const dispatched = await dispatchJobConfigs(db)
.reason("env_policy_override")
.filter(
isPassingLockingPolicy,
isPassingApprovalPolicy,
isPassingReleaseDependencyPolicy,
)
.jobConfigs(jobConfigs)
.dispatch();

const notDispatchedConfigs = jobConfigs.filter(
(config) =>
!dispatched.some((dispatch) => dispatch.jobConfigId === config.id),
);
console.log(
`${notDispatchedConfigs.length} out of ${jobConfigs.length} job configs were not dispatched.`,
);
}
Loading

0 comments on commit 5b96b5f

Please sign in to comment.