Skip to content

Commit

Permalink
dispatch changed resources
Browse files Browse the repository at this point in the history
  • Loading branch information
jsbroks committed Nov 21, 2024
1 parent d15ea05 commit 9dce14d
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions packages/job-dispatch/src/resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ const getExistingResourcesForProvider = (db: Tx, providerId: string) =>
.from(resource)
.where(and(eq(resource.providerId, providerId), isNotDeleted));

const dispatchNewResources = async (db: Tx, newResources: Resource[]) => {
const [firstResource] = newResources;
if (firstResource == null) return;

const workspaceId = firstResource.workspaceId;

const dispatchChangedResources = async (
db: Tx,
workspaceId: string,
resourceIds: string[],
) => {
const workspaceEnvs = await db
.select({ id: environment.id, resourceFilter: environment.resourceFilter })
.from(environment)
Expand All @@ -57,7 +56,6 @@ const dispatchNewResources = async (db: Tx, newResources: Resource[]) => {

log.info("workspaceEnvs", { workspaceEnvs });

const resourceIds = newResources.map((r) => r.id);
for (const env of workspaceEnvs) {
db.select()
.from(resource)
Expand Down Expand Up @@ -96,7 +94,7 @@ type ResourceWithVariables = Resource & {
const upsertResourceVariables = async (
tx: Tx,
resources: Array<ResourceWithVariables>,
): Promise<Resource[]> => {
) => {
const resourceIds = resources.map((r) => r.id);
const existingResourceVariables = await tx
.select()
Expand Down Expand Up @@ -153,7 +151,7 @@ const upsertResourceVariables = async (
newVar.key === existingVar.key,
),
)
.map((existingVar) => existingVar.resourceId),
.map((v) => v.resourceId),
);

const changedResources = new Set<string>([
Expand Down Expand Up @@ -197,7 +195,7 @@ const upsertResourceVariables = async (
throw err;
});

return resources.filter((resource) => changedResources.has(resource.id));
return changedResources;
};

type ResourceWithMetadata = Resource & { metadata?: Record<string, string> };
Expand Down Expand Up @@ -297,7 +295,7 @@ const upsertResourceMetadata = async (
),
);

return resources.filter((resource) => changedResources.has(resource.id));
return changedResources;
};

export type ResourceToInsert = InsertResource & {
Expand All @@ -309,6 +307,12 @@ export const upsertResources = async (
tx: Tx,
resourcesToInsert: ResourceToInsert[],
) => {
const workspaceId = resourcesToInsert[0]?.workspaceId;
if (workspaceId == null) throw new Error("Workspace ID is required");
if (!resourcesToInsert.every((r) => r.workspaceId === workspaceId)) {
throw new Error("All resources must belong to the same workspace");
}

try {
// Get existing resources from the database, grouped by providerId.
// - For resources without a providerId, look them up by workspaceId and
Expand Down Expand Up @@ -375,26 +379,34 @@ export const upsertResources = async (
})),
);

await Promise.all([
upsertResourceMetadata(tx, resources),
upsertResourceVariables(tx, resources),
const [changedResourcesMetadata, changedResourcesVariables] =
await Promise.all([
upsertResourceMetadata(tx, resources),
upsertResourceVariables(tx, resources),
]);

const changedResourceIds = new Set([
...Array.from(changedResourcesMetadata),
...Array.from(changedResourcesVariables),
]);

const newResources = resources.filter(
(r) =>
!resourcesBeforeInsert.some((er) => er.identifier === r.identifier),
);
for (const resource of newResources) changedResourceIds.add(resource.id);

log.info("new resources and providerId", {
providerId: resourcesToInsert[0]?.providerId,
newResources,
});

if (newResources.length > 0)
await dispatchNewResources(db, newResources).catch((err) => {
log.error("Error dispatching new resources", { error: err });
throw err;
});
await dispatchChangedResources(
db,
workspaceId,
Array.from(changedResourceIds),
);

const resourcesToDelete = resourcesBeforeInsert.filter(
(r) =>
Expand Down

0 comments on commit 9dce14d

Please sign in to comment.