Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Move events to job dispatch and use deleteResources utility function everywhere #222

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/jobs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ COPY packages/validators/package.json ./packages/validators/package.json
COPY packages/logger/package.json ./packages/logger/package.json
COPY packages/job-dispatch/package.json ./packages/job-dispatch/package.json
COPY packages/secrets/package.json ./packages/secrets/package.json
COPY packages/events/package.json ./packages/events/package.json

COPY apps/jobs/package.json ./apps/jobs/package.json

Expand Down
1 change: 0 additions & 1 deletion apps/jobs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
},
"dependencies": {
"@ctrlplane/db": "workspace:*",
"@ctrlplane/events": "workspace:*",
"@ctrlplane/job-dispatch": "workspace:*",
"@ctrlplane/logger": "workspace:*",
"@ctrlplane/validators": "workspace:*",
Expand Down
5 changes: 4 additions & 1 deletion apps/jobs/src/expired-env-checker/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { inArray, lte } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as SCHEMA from "@ctrlplane/db/schema";
import { getEventsForEnvironmentDeleted, handleEvent } from "@ctrlplane/events";
import {
getEventsForEnvironmentDeleted,
handleEvent,
} from "@ctrlplane/job-dispatch";

export const run = async () => {
const expiredEnvironments = await db
Expand Down
9 changes: 6 additions & 3 deletions apps/pty-proxy/src/controller/sockets.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import ms from "ms";

import { eq } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as SCHEMA from "@ctrlplane/db/schema";
import { deleteResources } from "@ctrlplane/job-dispatch";
import { logger } from "@ctrlplane/logger";

Expand All @@ -26,9 +28,10 @@ setInterval(() => {

agent.socket.close(1000, "Agent connection timed out");
agents.delete(agentId);
deleteResources(db, [agentId]).then(() => {
logger.info("Deleted stale agent resource", { agentId });
});
db.query.resource
.findFirst({ where: eq(SCHEMA.resource.id, agentId) })
.then((resource) => resource && deleteResources(db, [resource]))
.then(() => logger.info("Deleted stale agent resource", { agentId }));
Comment on lines +31 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling and improve logging precision

While the new implementation correctly checks for resource existence before deletion, there are a few concerns:

  1. Missing error handling for the promise chain
  2. The success log message doesn't accurately reflect whether deletion succeeded
  3. Potential race condition between resource check and deletion

Consider this improved implementation:

       db.query.resource
         .findFirst({ where: eq(SCHEMA.resource.id, agentId) })
         .then((resource) => resource && deleteResources(db, [resource]))
-        .then(() => logger.info("Deleted stale agent resource", { agentId }));
+        .then((deleted) => {
+          if (deleted) {
+            logger.info("Deleted stale agent resource", { agentId });
+          } else {
+            logger.info("No resource found for stale agent", { agentId });
+          }
+        })
+        .catch((error) => {
+          logger.error("Failed to delete stale agent resource", {
+            agentId,
+            error: error.message
+          });
+        });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
db.query.resource
.findFirst({ where: eq(SCHEMA.resource.id, agentId) })
.then((resource) => resource && deleteResources(db, [resource]))
.then(() => logger.info("Deleted stale agent resource", { agentId }));
db.query.resource
.findFirst({ where: eq(SCHEMA.resource.id, agentId) })
.then((resource) => resource && deleteResources(db, [resource]))
.then((deleted) => {
if (deleted) {
logger.info("Deleted stale agent resource", { agentId });
} else {
logger.info("No resource found for stale agent", { agentId });
}
})
.catch((error) => {
logger.error("Failed to delete stale agent resource", {
agentId,
error: error.message
});
});

}
}
}, ms("1m"));
1 change: 0 additions & 1 deletion apps/webservice/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ COPY packages/job-dispatch/package.json ./packages/job-dispatch/package.json
COPY packages/ui/package.json ./packages/ui/package.json
COPY packages/logger/package.json ./packages/logger/package.json
COPY packages/secrets/package.json ./packages/secrets/package.json
COPY packages/events/package.json ./packages/events/package.json

COPY apps/webservice/package.json ./apps/webservice/package.json

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export const EditHookDialog: React.FC<EditHookDialogProps> = ({

const defaultValues = {
...hook,
action: hook.action as HookAction,
jobAgentId: hook.runhook?.runbook.jobAgentId ?? null,
jobAgentConfig: hook.runhook?.runbook.jobAgentConfig ?? null,
};
Expand Down Expand Up @@ -144,7 +145,7 @@ export const EditHookDialog: React.FC<EditHookDialogProps> = ({
className="items-center justify-start gap-2 px-2"
>
<IconSelector className="h-4 w-4" />
{value === "" ? "Select action..." : value}
{value}
</Button>
</PopoverTrigger>
<PopoverContent align="start" className="w-[462px] p-0">
Expand Down
1 change: 0 additions & 1 deletion packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"dependencies": {
"@ctrlplane/auth": "workspace:*",
"@ctrlplane/db": "workspace:*",
"@ctrlplane/events": "workspace:*",
"@ctrlplane/job-dispatch": "workspace:*",
"@ctrlplane/logger": "workspace:*",
"@ctrlplane/secrets": "workspace:*",
Expand Down
5 changes: 4 additions & 1 deletion packages/api/src/router/deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ import {
updateReleaseChannel,
workspace,
} from "@ctrlplane/db/schema";
import { getEventsForDeploymentDeleted, handleEvent } from "@ctrlplane/events";
import {
getEventsForDeploymentDeleted,
handleEvent,
} from "@ctrlplane/job-dispatch";
import { Permission } from "@ctrlplane/validators/auth";
import { JobStatus } from "@ctrlplane/validators/jobs";

Expand Down
7 changes: 5 additions & 2 deletions packages/api/src/router/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import {
system,
updateEnvironment,
} from "@ctrlplane/db/schema";
import { getEventsForEnvironmentDeleted, handleEvent } from "@ctrlplane/events";
import { dispatchJobsForNewResources } from "@ctrlplane/job-dispatch";
import {
dispatchJobsForNewResources,
getEventsForEnvironmentDeleted,
handleEvent,
} from "@ctrlplane/job-dispatch";
import { Permission } from "@ctrlplane/validators/auth";
import {
ComparisonOperator,
Expand Down
21 changes: 6 additions & 15 deletions packages/api/src/router/resources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import {
takeFirstOrNull,
} from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";
import { getEventsForResourceDeleted, handleEvent } from "@ctrlplane/events";
import {
cancelOldReleaseJobTriggersOnJobDispatch,
createJobApprovals,
createReleaseJobTriggers,
deleteResources,
dispatchReleaseJobTriggers,
isPassingAllPoliciesExceptNewerThanLastActive,
isPassingNoPendingJobsPolicy,
Expand Down Expand Up @@ -577,20 +577,11 @@ export const resourceRouter = createTRPCRouter({
),
})
.input(z.array(z.string().uuid()))
.mutation(async ({ ctx, input }) => {
const resources = await ctx.db.query.resource.findMany({
where: inArray(schema.resource.id, input),
});
const events = (
await Promise.allSettled(resources.map(getEventsForResourceDeleted))
).flatMap((r) => (r.status === "fulfilled" ? r.value : []));
await Promise.allSettled(events.map(handleEvent));

return ctx.db
.delete(schema.resource)
.where(inArray(schema.resource.id, input))
.returning();
}),
.mutation(async ({ ctx, input }) =>
ctx.db.query.resource
.findMany({ where: inArray(schema.resource.id, input) })
.then((resources) => deleteResources(ctx.db, resources)),
),
Comment on lines +580 to +584
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider wrapping operations in a transaction and adding error handling

While the implementation correctly uses the new deleteResources utility, consider these improvements:

  1. Wrap the query and delete operations in a transaction to ensure atomicity
  2. Add error handling for cases where resources don't exist
  3. Consider performance optimization by selecting only necessary fields instead of fetching entire resources

Here's a suggested implementation:

  .mutation(async ({ ctx, input }) =>
-   ctx.db.query.resource
-     .findMany({ where: inArray(schema.resource.id, input) })
-     .then((resources) => deleteResources(ctx.db, resources)),
+   ctx.db.transaction(async (tx) => {
+     const resources = await tx.query.resource.findMany({
+       where: inArray(schema.resource.id, input),
+       columns: {
+         id: true,
+         kind: true,
+         name: true,
+         workspaceId: true
+       }
+     });
+     
+     if (resources.length !== input.length) {
+       throw new Error('Some resources not found');
+     }
+     
+     return deleteResources(tx, resources);
+   }),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.mutation(async ({ ctx, input }) =>
ctx.db.query.resource
.findMany({ where: inArray(schema.resource.id, input) })
.then((resources) => deleteResources(ctx.db, resources)),
),
.mutation(async ({ ctx, input }) =>
ctx.db.transaction(async (tx) => {
const resources = await tx.query.resource.findMany({
where: inArray(schema.resource.id, input),
columns: {
id: true,
kind: true,
name: true,
workspaceId: true
}
});
if (resources.length !== input.length) {
throw new Error('Some resources not found');
}
return deleteResources(tx, resources);
}),
),


metadataKeys: protectedProcedure
.meta({
Expand Down
35 changes: 4 additions & 31 deletions packages/api/src/router/runbook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,16 @@ export const runbookRouter = createTRPCRouter({
.on({ type: "system", id: input }),
})
.input(z.string().uuid())
.query(({ ctx, input }) => {
return ctx.db.query.runbook.findMany({
.query(({ ctx, input }) =>
ctx.db.query.runbook.findMany({
where: eq(SCHEMA.runbook.systemId, input),
with: {
runhooks: { with: { hook: true } },
jobAgent: true,
variables: true,
},
});
// return ctx.db
// .select()
// .from(SCHEMA.runbook)
// .leftJoin(
// SCHEMA.runbookVariable,
// eq(SCHEMA.runbookVariable.runbookId, SCHEMA.runbook.id),
// )
// .leftJoin(
// SCHEMA.jobAgent,
// eq(SCHEMA.runbook.jobAgentId, SCHEMA.jobAgent.id),
// )
// .leftJoin(
// SCHEMA.runbookJobTrigger,
// eq(SCHEMA.runbook.id, SCHEMA.runbookJobTrigger.runbookId),
// )
// .leftJoin(SCHEMA.job, eq(SCHEMA.runbookJobTrigger.jobId, SCHEMA.job.id))
// .where(eq(SCHEMA.runbook.systemId, input))
// .then((rbs) =>
// _.chain(rbs)
// .groupBy((rb) => rb.runbook.id)
// .map((rb) => ({
// ...rb[0]!.runbook,
// variables: rb.map((v) => v.runbook_variable).filter(isPresent),
// jobAgent: rb[0]!.job_agent,
// }))
// .value(),
// );
}),
}),
),

create: protectedProcedure
.meta({
Expand Down
4 changes: 1 addition & 3 deletions packages/db/src/schema/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ export const runhook = pgTable(
.notNull()
.references(() => runbook.id, { onDelete: "cascade" }),
},
(t) => ({
uniq: uniqueIndex().on(t.hookId, t.runbookId),
}),
(t) => ({ uniq: uniqueIndex().on(t.hookId, t.runbookId) }),
);

export const runhookRelations = relations(runhook, ({ one }) => ({
Expand Down
10 changes: 0 additions & 10 deletions packages/events/eslint.config.js

This file was deleted.

38 changes: 0 additions & 38 deletions packages/events/package.json

This file was deleted.

1 change: 0 additions & 1 deletion packages/events/src/handlers/index.ts

This file was deleted.

9 changes: 0 additions & 9 deletions packages/events/tsconfig.json

This file was deleted.

1 change: 1 addition & 0 deletions packages/job-dispatch/src/events/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./resource-removed.js";
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from "./environment-deleted.js";
export * from "./deployment-deleted.js";
export * from "./target-deleted.js";
export * from "./resource-deleted.js";
1 change: 1 addition & 0 deletions packages/job-dispatch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ export * from "./policies/concurrency-policy.js";
export * from "./policies/release-window.js";
export * from "./environment-creation.js";
export * from "./pending-job-checker.js";
export * from "./events/index.js";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Event module integration is properly structured but has a circular dependency

The events module is well-structured with proper exports and imports, but there's a circular dependency that needs attention:

  • packages/job-dispatch/src/events/handlers/resource-removed.ts imports from @ctrlplane/job-dispatch while being part of the same package
  • This creates a circular dependency as the main package exports the events module which contains this handler

Suggested fix:

  • Move the dispatchRunbook function to a separate utility module
  • Import it directly in resource-removed.ts instead of importing from the main package
🔗 Analysis chain

Verify the event module integration.

Let's ensure this export is properly integrated:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the events module integration and usage

# Check if the events/index.js file exists
echo "Checking for events/index.js file..."
fd --type f "index.js" packages/job-dispatch/src/events

# Check for potential circular dependencies
echo "Checking for potential circular dependencies..."
rg -l "from \"@ctrlplane/job-dispatch\"" packages/job-dispatch/src/events

# Check usage of the newly exported events
echo "Checking usage of exported events..."
rg --type ts "from \"@ctrlplane/job-dispatch\".*events" -A 3

Length of output: 523


Script:

#!/bin/bash
# Let's verify the events module structure and integration more thoroughly

# Check the content of events/index.ts (not .js)
echo "Checking for events/index.ts file..."
fd --type f "index.ts" packages/job-dispatch/src/events

echo "Checking content of events/index.ts if exists..."
fd --type f "index.ts" packages/job-dispatch/src/events --exec cat {}

echo "Checking for all files in events directory..."
fd . packages/job-dispatch/src/events

# Check the specific circular dependency found
echo "Checking content of resource-removed.ts..."
cat packages/job-dispatch/src/events/handlers/resource-removed.ts

# Check if events are imported elsewhere in the project
echo "Checking broader usage of events..."
rg "from.*events" packages/job-dispatch/src

Length of output: 4003

14 changes: 9 additions & 5 deletions packages/job-dispatch/src/resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
import { logger } from "@ctrlplane/logger";
import { variablesAES256 } from "@ctrlplane/secrets";

import { getEventsForResourceDeleted, handleEvent } from "./events/index.js";
import { dispatchJobsForNewResources } from "./new-resource.js";

const log = logger.child({ label: "upsert-resources" });
Expand Down Expand Up @@ -320,10 +321,7 @@ export const upsertResources = async (
);

if (resourcesToDelete.length > 0) {
await deleteResources(
tx,
resourcesToDelete.map((r) => r.id),
).catch((err) => {
await deleteResources(tx, resourcesToDelete).catch((err) => {
log.error("Error deleting resources", { error: err });
throw err;
});
Expand All @@ -346,6 +344,12 @@ export const upsertResources = async (
* @param tx - The transaction to use.
* @param resourceIds - The ids of the resources to delete.
*/
export const deleteResources = async (tx: Tx, resourceIds: string[]) => {
export const deleteResources = async (tx: Tx, resources: Resource[]) => {
const eventsPromises = Promise.all(
resources.map(getEventsForResourceDeleted),
);
const events = await eventsPromises.then((res) => res.flat());
await Promise.all(events.map(handleEvent));
Comment on lines +347 to +352
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling in deleteResources function

The deleteResources function lacks error handling around the asynchronous operations for fetching and handling events. If any of these promises are rejected, it could lead to unhandled promise rejections and potentially leave the system in an inconsistent state. To maintain robustness and consistency with the rest of the codebase, consider adding error handling.

Apply this diff to add error handling:

 export const deleteResources = async (tx: Tx, resources: Resource[]) => {
+  try {
     const eventsPromises = Promise.all(
       resources.map(getEventsForResourceDeleted),
     );
     const events = await eventsPromises.then((res) => res.flat());
     await Promise.all(events.map(handleEvent));
     const resourceIds = resources.map((r) => r.id);
     await tx.delete(resource).where(inArray(resource.id, resourceIds));
+  } catch (err) {
+    log.error("Error in deleteResources", { error: err });
+    throw err;
+  }
 };

Committable suggestion skipped: line range outside the PR's diff.

const resourceIds = resources.map((r) => r.id);
await tx.delete(resource).where(inArray(resource.id, resourceIds));
};
3 changes: 2 additions & 1 deletion packages/job-dispatch/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"extends": "@ctrlplane/tsconfig/internal-package.json",
"compilerOptions": {
"outDir": "dist",
"baseUrl": "."
"baseUrl": ".",
"rootDir": "."
},
"include": ["src", "../api/src/utils"],
"exclude": ["node_modules"]
Expand Down
Loading
Loading