Skip to content

Commit

Permalink
refactor(api-headless-cms-bulk-action): wait for subtasks to finish b…
Browse files Browse the repository at this point in the history
…efore cleanup (#4331)

Co-authored-by: Bruno Zorić <[email protected]>
  • Loading branch information
leopuleo and brunozoric authored Oct 16, 2024
1 parent 5e0b6cb commit 391a782
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { createTaskDefinition } from "@webiny/tasks";
import { HcmsBulkActionsContext, IBulkActionOperationByModelInput } from "~/types";
import { createPrivateTaskDefinition, TaskDataStatus } from "@webiny/tasks";
import {
HcmsBulkActionsContext,
IBulkActionOperationByModelInput,
TrashBinCleanUpParams
} from "~/types";
import { ChildTasksCleanup } from "~/useCases/internals";

const calculateDateTimeString = () => {
Expand All @@ -19,22 +23,86 @@ const calculateDateTimeString = () => {
return currentDate.toISOString();
};

const cleanup = async ({ context, task }: TrashBinCleanUpParams) => {
// We want to clean all child tasks and logs, which have no errors.
const childTasksCleanup = new ChildTasksCleanup();
try {
await childTasksCleanup.execute({
context,
task
});
} catch (ex) {
console.error(`Error while cleaning "EmptyTrashBins" child tasks.`, ex);
}
};

export const createEmptyTrashBinsTask = () => {
return createTaskDefinition<HcmsBulkActionsContext>({
isPrivate: true,
return createPrivateTaskDefinition<HcmsBulkActionsContext>({
id: "hcmsEntriesEmptyTrashBins",
title: "Headless CMS - Empty all trash bins",
description:
"Delete all entries found in the trash bin, for each model found in the system.",
maxIterations: 1,
maxIterations: 24,
disableDatabaseLogs: true,
run: async params => {
const { response, isAborted, context } = params;
const { response, isAborted, isCloseToTimeout, context, trigger, input, store } =
params;
if (isAborted()) {
return response.aborted();
} else if (isCloseToTimeout()) {
return response.continue(
{
...input
},
{
seconds: 30
}
);
}

try {
if (isAborted()) {
return response.aborted();
if (input.triggered) {
const { items } = await context.tasks.listTasks({
where: {
parentId: store.getTask().id,
taskStatus_in: [TaskDataStatus.RUNNING, TaskDataStatus.PENDING]
},
limit: 100000
});

if (items.length === 0) {
return response.done(
"Task done: emptying the trash bin for all registered models."
);
}

for (const item of items) {
const status = await context.tasks.fetchServiceInfo(item.id);

if (status?.status === "FAILED" || status?.status === "TIMED_OUT") {
await context.tasks.updateTask(item.id, {
taskStatus: TaskDataStatus.FAILED
});
continue;
}

if (status?.status === "ABORTED") {
await context.tasks.updateTask(item.id, {
taskStatus: TaskDataStatus.ABORTED
});
}
}

return response.continue(
{
...input
},
{
seconds: 3600
}
);
}

try {
const locales = context.i18n.getLocales();

await context.i18n.withEachLocale(locales, async () => {
Expand All @@ -43,10 +111,9 @@ export const createEmptyTrashBinsTask = () => {
});

for (const model of models) {
await context.tasks.trigger<IBulkActionOperationByModelInput>({
await trigger<IBulkActionOperationByModelInput>({
name: `Headless CMS - Empty trash bin for "${model.name}" model.`,
definition: "hcmsBulkListDeleteEntries",
parent: params.store.getTask(),
input: {
modelId: model.modelId,
where: {
Expand All @@ -55,29 +122,23 @@ export const createEmptyTrashBinsTask = () => {
}
});
}
return;
});

return response.done(
`Task done: emptying the trash bin for all registered models.`
return response.continue(
{
triggered: true
},
{
seconds: 120
}
);
} catch (ex) {
return response.error(ex.message ?? "Error while executing EmptyTrashBins task");
}
},
onDone: async ({ context, task }) => {
/**
* We want to clean all child tasks and logs, which have no errors.
*/
const childTasksCleanup = new ChildTasksCleanup();
try {
await childTasksCleanup.execute({
context,
task
});
} catch (ex) {
console.error("Error while cleaning `EmptyTrashBins` child tasks.", ex);
}
}
onMaxIterations: cleanup,
onDone: cleanup,
onError: cleanup,
onAbort: cleanup
});
};
14 changes: 14 additions & 0 deletions packages/api-headless-cms-bulk-actions/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { CmsContext } from "@webiny/api-headless-cms/types";
import { Context as BaseContext } from "@webiny/handler/types";
import {
Context as TasksContext,
ITaskOnAbortParams,
ITaskOnErrorParams,
ITaskOnMaxIterationsParams,
ITaskOnSuccessParams,
ITaskResponseDoneResultOutput,
ITaskRunParams
} from "@webiny/tasks/types";
Expand Down Expand Up @@ -64,3 +68,13 @@ export type IBulkActionOperationByModelTaskParams = ITaskRunParams<
IBulkActionOperationByModelInput,
IBulkActionOperationByModelOutput
>;

/**
* Trash Bin
*/

export type TrashBinCleanUpParams =
| ITaskOnSuccessParams<HcmsBulkActionsContext>
| ITaskOnErrorParams<HcmsBulkActionsContext>
| ITaskOnAbortParams<HcmsBulkActionsContext>
| ITaskOnMaxIterationsParams<HcmsBulkActionsContext>;
10 changes: 9 additions & 1 deletion packages/tasks/src/crud/service.tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ export const createServiceCrud = (context: Context): ITasksContextServiceObject
delay
});

const task = await context.tasks.createTask<T>(input);
let task: ITask<T>;
try {
task = await context.tasks.createTask<T>(input);
} catch (ex) {
console.log("Could not create the task.", ex);
throw ex;
}

let result: Awaited<ReturnType<typeof service.send>> | null = null;
try {
Expand All @@ -91,6 +97,8 @@ export const createServiceCrud = (context: Context): ITasksContextServiceObject
);
}
} catch (ex) {
console.log("Could not trigger the step function.");
console.error(ex);
/**
* In case of failure to create the Event Bridge Event, we need to delete the task that was meant to be created.
* TODO maybe we can leave the task and update it as failed - with event bridge error?
Expand Down

0 comments on commit 391a782

Please sign in to comment.