diff --git a/packages/job-dispatch/src/resource.ts b/packages/job-dispatch/src/resource.ts index 9de79ebe..9d9c4d2c 100644 --- a/packages/job-dispatch/src/resource.ts +++ b/packages/job-dispatch/src/resource.ts @@ -38,12 +38,11 @@ const getExistingResourcesForProvider = (db: Tx, providerId: string) => .from(resource) .where(and(eq(resource.providerId, providerId), isNotDeleted)); -const dispatchNewResources = async (db: Tx, newResources: Resource[]) => { - const [firstResource] = newResources; - if (firstResource == null) return; - - const workspaceId = firstResource.workspaceId; - +const dispatchChangedResources = async ( + db: Tx, + workspaceId: string, + resourceIds: string[], +) => { const workspaceEnvs = await db .select({ id: environment.id, resourceFilter: environment.resourceFilter }) .from(environment) @@ -57,7 +56,6 @@ const dispatchNewResources = async (db: Tx, newResources: Resource[]) => { log.info("workspaceEnvs", { workspaceEnvs }); - const resourceIds = newResources.map((r) => r.id); for (const env of workspaceEnvs) { db.select() .from(resource) @@ -96,7 +94,7 @@ type ResourceWithVariables = Resource & { const upsertResourceVariables = async ( tx: Tx, resources: Array, -): Promise => { +) => { const resourceIds = resources.map((r) => r.id); const existingResourceVariables = await tx .select() @@ -153,7 +151,7 @@ const upsertResourceVariables = async ( newVar.key === existingVar.key, ), ) - .map((existingVar) => existingVar.resourceId), + .map((v) => v.resourceId), ); const changedResources = new Set([ @@ -197,7 +195,7 @@ const upsertResourceVariables = async ( throw err; }); - return resources.filter((resource) => changedResources.has(resource.id)); + return changedResources; }; type ResourceWithMetadata = Resource & { metadata?: Record }; @@ -297,7 +295,7 @@ const upsertResourceMetadata = async ( ), ); - return resources.filter((resource) => changedResources.has(resource.id)); + return changedResources; }; export type ResourceToInsert = InsertResource & { @@ -309,6 +307,12 @@ export const upsertResources = async ( tx: Tx, resourcesToInsert: ResourceToInsert[], ) => { + const workspaceId = resourcesToInsert[0]?.workspaceId; + if (workspaceId == null) throw new Error("Workspace ID is required"); + if (!resourcesToInsert.every((r) => r.workspaceId === workspaceId)) { + throw new Error("All resources must belong to the same workspace"); + } + try { // Get existing resources from the database, grouped by providerId. // - For resources without a providerId, look them up by workspaceId and @@ -375,15 +379,22 @@ export const upsertResources = async ( })), ); - await Promise.all([ - upsertResourceMetadata(tx, resources), - upsertResourceVariables(tx, resources), + const [changedResourcesMetadata, changedResourcesVariables] = + await Promise.all([ + upsertResourceMetadata(tx, resources), + upsertResourceVariables(tx, resources), + ]); + + const changedResourceIds = new Set([ + ...Array.from(changedResourcesMetadata), + ...Array.from(changedResourcesVariables), ]); const newResources = resources.filter( (r) => !resourcesBeforeInsert.some((er) => er.identifier === r.identifier), ); + for (const resource of newResources) changedResourceIds.add(resource.id); log.info("new resources and providerId", { providerId: resourcesToInsert[0]?.providerId, @@ -391,10 +402,11 @@ export const upsertResources = async ( }); if (newResources.length > 0) - await dispatchNewResources(db, newResources).catch((err) => { - log.error("Error dispatching new resources", { error: err }); - throw err; - }); + await dispatchChangedResources( + db, + workspaceId, + Array.from(changedResourceIds), + ); const resourcesToDelete = resourcesBeforeInsert.filter( (r) =>