From 9e7714e627e87ead4ac42534e0deaf9c658f11dc Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Wed, 14 Aug 2024 18:27:32 +0200 Subject: [PATCH] Add workflow run entity (#6622) - create a workflow run every time a workflow is triggered in not_started status. This status will be helpful later for once workflows will be scheduled - update run status once workflow starts running - complete status once the workflow finished running - add a failed status if an error occurs --- .../composite-types/actor.composite-type.ts | 1 + .../constants/standard-field-ids.ts | 9 ++ .../constants/standard-object-ids.ts | 1 + .../standard-objects/index.ts | 2 + .../workflow-run.workspace-entity.ts | 124 ++++++++++++++++++ .../workflow-version.workspace-entity.ts | 23 +++- .../workflow-runner.exception.ts | 11 ++ .../workflow-runner/workflow-runner.job.ts | 56 ++++---- .../workflow-runner/workflow-runner.module.ts | 11 +- .../workflow-runner.service.ts | 25 +++- .../workflow-status.exception.ts | 13 ++ .../workflow-status/workflow-status.module.ts | 9 ++ .../workflow-status.workspace-service.ts | 92 +++++++++++++ .../jobs/workflow-event-trigger.job.ts | 37 +++++- .../jobs/workflow-trigger-job.module.ts | 3 +- .../workflow-trigger.exception.ts | 1 + .../workflow-trigger.service.ts | 17 ++- 17 files changed, 390 insertions(+), 45 deletions(-) create mode 100644 packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.exception.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts diff --git a/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts b/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts index 69c6d256ab45..1efa0eeffff5 100644 --- a/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts +++ b/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts @@ -8,6 +8,7 @@ import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/fi export enum FieldActorSource { EMAIL = 'EMAIL', CALENDAR = 'CALENDAR', + WORKFLOW = 'WORKFLOW', API = 'API', IMPORT = 'IMPORT', MANUAL = 'MANUAL', diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index 12d7b809ed4c..5ce6734b97f8 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -411,10 +411,19 @@ export const WORKFLOW_STANDARD_FIELD_IDS = { noteTargets: '20202020-40aa-4839-965e-972a2f72e08d', }; +export const WORKFLOW_RUN_STANDARD_FIELD_IDS = { + workflowVersion: '20202020-2f52-4ba8-8dc4-d0d6adb9578d', + startedAt: '20202020-a234-4e2d-bd15-85bcea6bb183', + endedAt: '20202020-e1c1-4b6b-bbbd-b2beaf2e159e', + status: '20202020-6b3e-4f9c-8c2b-2e5b8e6d6f3b', + createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3', +}; + export const WORKFLOW_VERSION_STANDARD_FIELD_IDS = { name: '20202020-a12f-4cca-9937-a2e40cc65509', workflow: '20202020-afa3-46c3-91b0-0631ca6aa1c8', trigger: '20202020-4eae-43e7-86e0-212b41a30b48', + runs: '20202020-1d08-46df-901a-85045f18099a', }; export const WORKSPACE_MEMBER_STANDARD_FIELD_IDS = { diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts index 7be15073db4b..a13e78836b57 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts @@ -41,6 +41,7 @@ export const STANDARD_OBJECT_IDS = { webhook: '20202020-be4d-4e08-811d-0fffcd13ffd4', workflow: '20202020-62be-406c-b9ca-8caa50d51392', workflowEventListener: '20202020-92aa-462f-965c-a785b00e9989', + workflowRun: '20202020-4e28-4e95-a9d7-6c00874f843c', workflowVersion: '20202020-d65d-4ab9-9344-d77bfb376a3d', workspaceMember: '20202020-3319-4234-a34c-82d5c0e881a6', }; diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts index 95c148669293..d510e4f2d2e7 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts @@ -32,6 +32,7 @@ import { ViewSortWorkspaceEntity } from 'src/modules/view/standard-objects/view- import { ViewWorkspaceEntity } from 'src/modules/view/standard-objects/view.workspace-entity'; import { WebhookWorkspaceEntity } from 'src/modules/webhook/standard-objects/webhook.workspace-entity'; import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; +import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @@ -64,6 +65,7 @@ export const standardObjectMetadataDefinitions = [ WorkflowWorkspaceEntity, WorkflowEventListenerWorkspaceEntity, WorkflowVersionWorkspaceEntity, + WorkflowRunWorkspaceEntity, WorkspaceMemberWorkspaceEntity, MessageThreadWorkspaceEntity, MessageThreadSubscriberWorkspaceEntity, diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts new file mode 100644 index 000000000000..5f78df89164c --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -0,0 +1,124 @@ +import { Relation } from 'src/engine/workspace-manager/workspace-sync-metadata/interfaces/relation.interface'; + +import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; +import { + ActorMetadata, + FieldActorSource, +} from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; +import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; +import { RelationMetadataType } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; +import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; +import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator'; +import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator'; +import { WorkspaceGate } from 'src/engine/twenty-orm/decorators/workspace-gate.decorator'; +import { WorkspaceIsNullable } from 'src/engine/twenty-orm/decorators/workspace-is-nullable.decorator'; +import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator'; +import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator'; +import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; +import { WORKFLOW_RUN_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; +import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; +import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; + +export enum WorkflowRunStatus { + NOT_STARTED = 'NOT_STARTED', + RUNNING = 'RUNNING', + COMPLETED = 'COMPLETED', + FAILED = 'FAILED', +} + +@WorkspaceEntity({ + standardId: STANDARD_OBJECT_IDS.workflowRun, + namePlural: 'workflowRuns', + labelSingular: 'workflowRun', + labelPlural: 'WorkflowRuns', + description: 'A workflow run', +}) +@WorkspaceGate({ + featureFlag: FeatureFlagKey.IsWorkflowEnabled, +}) +@WorkspaceIsSystem() +export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.startedAt, + type: FieldMetadataType.DATE_TIME, + label: 'Workflow run started at', + description: 'Workflow run started at', + icon: 'IconHistory', + }) + @WorkspaceIsNullable() + startedAt: string | null; + + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.endedAt, + type: FieldMetadataType.DATE_TIME, + label: 'Workflow run ended at', + description: 'Workflow run ended at', + icon: 'IconHistory', + }) + @WorkspaceIsNullable() + endedAt: string | null; + + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.status, + type: FieldMetadataType.SELECT, + label: 'Workflow run status', + description: 'Workflow run status', + icon: 'IconHistory', + options: [ + { + value: WorkflowRunStatus.NOT_STARTED, + label: 'Not started', + position: 0, + color: 'grey', + }, + { + value: WorkflowRunStatus.RUNNING, + label: 'Running', + position: 1, + color: 'yellow', + }, + { + value: WorkflowRunStatus.COMPLETED, + label: 'Completed', + position: 2, + color: 'green', + }, + { + value: WorkflowRunStatus.FAILED, + label: 'Failed', + position: 3, + color: 'red', + }, + ], + defaultValue: "'NOT_STARTED'", + }) + status: WorkflowRunStatus; + + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.createdBy, + type: FieldMetadataType.ACTOR, + label: 'Created by', + icon: 'IconCreativeCommonsSa', + description: 'The creator of the record', + defaultValue: { + source: `'${FieldActorSource.MANUAL}'`, + name: "''", + }, + }) + createdBy: ActorMetadata; + + // Relations + @WorkspaceRelation({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion, + type: RelationMetadataType.MANY_TO_ONE, + label: 'Workflow', + description: 'WorkflowVersion workflow', + icon: 'IconVersions', + inverseSideTarget: () => WorkflowVersionWorkspaceEntity, + inverseSideFieldKey: 'runs', + }) + workflowVersion: Relation; + + @WorkspaceJoinColumn('workflowVersion') + workflowVersionId: string; +} diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts index 3c52fb22f31b..ec8a8206b2e2 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts @@ -2,7 +2,10 @@ import { Relation } from 'src/engine/workspace-manager/workspace-sync-metadata/i import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; -import { RelationMetadataType } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; +import { + RelationMetadataType, + RelationOnDeleteAction, +} from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator'; import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator'; @@ -11,8 +14,12 @@ import { WorkspaceIsNullable } from 'src/engine/twenty-orm/decorators/workspace- import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator'; import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator'; import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; -import { WORKFLOW_VERSION_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; +import { + WORKFLOW_RUN_STANDARD_FIELD_IDS, + WORKFLOW_VERSION_STANDARD_FIELD_IDS, +} from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; +import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type'; @@ -64,4 +71,16 @@ export class WorkflowVersionWorkspaceEntity extends BaseWorkspaceEntity { @WorkspaceJoinColumn('workflow') workflowId: string; + + @WorkspaceRelation({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion, + type: RelationMetadataType.ONE_TO_MANY, + label: 'Runs', + description: 'Workflow runs linked to the version.', + icon: 'IconVersions', + inverseSideTarget: () => WorkflowRunWorkspaceEntity, + onDelete: RelationOnDeleteAction.SET_NULL, + }) + @WorkspaceIsNullable() + runs: Relation; } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.exception.ts new file mode 100644 index 000000000000..1bab0da515ee --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.exception.ts @@ -0,0 +1,11 @@ +import { CustomException } from 'src/utils/custom-exception'; + +export class WorkflowRunnerException extends CustomException { + constructor(message: string, code: string) { + super(message, code); + } +} + +export enum WorkflowRunnerExceptionCode { + WORKFLOW_FAILED = 'WORKFLOW_FAILED', +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts index b2937ffd17c2..ab3d13d1a7e8 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts @@ -1,54 +1,60 @@ +import { Scope } from '@nestjs/common'; + import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services'; import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; +import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; export type RunWorkflowJobData = { workspaceId: string; - workflowId: string; + workflowVersionId: string; + workflowRunId: string; payload: object; }; -@Processor(MessageQueue.workflowQueue) +@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class WorkflowRunnerJob { constructor( private readonly workflowCommonService: WorkflowCommonService, private readonly workflowRunnerService: WorkflowRunnerService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService, ) {} @Process(WorkflowRunnerJob.name) async handle({ workspaceId, - workflowId, + workflowVersionId, + workflowRunId, payload, }: RunWorkflowJobData): Promise { - const workflowRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflow', - ); - - const workflow = await workflowRepository.findOneByOrFail({ - id: workflowId, - }); - - if (!workflow.publishedVersionId) { - throw new Error('Workflow has no published version'); - } + await this.workflowStatusWorkspaceService.startWorkflowRun(workflowRunId); const workflowVersion = await this.workflowCommonService.getWorkflowVersion( workspaceId, - workflow.publishedVersionId, + workflowVersionId, ); - await this.workflowRunnerService.run({ - action: workflowVersion.trigger.nextAction, - workspaceId, - payload, - }); + try { + await this.workflowRunnerService.run({ + action: workflowVersion.trigger.nextAction, + workspaceId, + payload, + }); + + await this.workflowStatusWorkspaceService.endWorkflowRun( + workflowRunId, + WorkflowRunStatus.COMPLETED, + ); + } catch (error) { + await this.workflowStatusWorkspaceService.endWorkflowRun( + workflowRunId, + WorkflowRunStatus.FAILED, + ); + + throw error; + } } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts index a5698425057d..cce52e509001 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts @@ -1,12 +1,17 @@ import { Module } from '@nestjs/common'; -import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; -import { WorkflowRunnerJob } from 'src/modules/workflow/workflow-runner/workflow-runner.job'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowActionRunnerModule } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.module'; +import { WorkflowRunnerJob } from 'src/modules/workflow/workflow-runner/workflow-runner.job'; +import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; +import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module'; @Module({ - imports: [WorkflowCommonModule, WorkflowActionRunnerModule], + imports: [ + WorkflowCommonModule, + WorkflowActionRunnerModule, + WorkflowStatusModule, + ], providers: [WorkflowRunnerService, WorkflowRunnerJob], exports: [WorkflowRunnerService], }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts index 9b79d20536b6..e8d37e85dfca 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts @@ -2,9 +2,18 @@ import { Injectable } from '@nestjs/common'; import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-action.type'; import { WorkflowActionRunnerFactory } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.factory'; +import { + WorkflowRunnerException, + WorkflowRunnerExceptionCode, +} from 'src/modules/workflow/workflow-runner/workflow-runner.exception'; const MAX_RETRIES_ON_FAILURE = 3; +export type WorkflowRunOutput = { + data?: object; + error?: object; +}; + @Injectable() export class WorkflowRunnerService { constructor( @@ -21,9 +30,11 @@ export class WorkflowRunnerService { workspaceId: string; payload?: object; attemptCount?: number; - }) { + }): Promise { if (!action) { - return payload; + return { + data: payload, + }; } const workflowActionRunner = this.workflowActionRunnerFactory.get( @@ -45,7 +56,10 @@ export class WorkflowRunnerService { } if (!result.error) { - throw new Error('Execution result error, no data or error'); + throw new WorkflowRunnerException( + 'Execution result error, no data or error', + WorkflowRunnerExceptionCode.WORKFLOW_FAILED, + ); } if (action.settings.errorHandlingOptions.continueOnFailure.value) { @@ -68,6 +82,9 @@ export class WorkflowRunnerService { }); } - return result.error; + throw new WorkflowRunnerException( + `Workflow failed: ${result.error}`, + WorkflowRunnerExceptionCode.WORKFLOW_FAILED, + ); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts new file mode 100644 index 000000000000..6510815f090c --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts @@ -0,0 +1,13 @@ +import { CustomException } from 'src/utils/custom-exception'; + +export class WorkflowStatusException extends CustomException { + code: WorkflowStatusExceptionCode; + constructor(message: string, code: WorkflowStatusExceptionCode) { + super(message, code); + } +} + +export enum WorkflowStatusExceptionCode { + WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND', + INVALID_OPERATION = 'INVALID_OPERATION', +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts new file mode 100644 index 000000000000..14eec4fb10c7 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; + +import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; + +@Module({ + providers: [WorkflowStatusWorkspaceService], + exports: [WorkflowStatusWorkspaceService], +}) +export class WorkflowStatusModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts new file mode 100644 index 000000000000..949ae0c27e21 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts @@ -0,0 +1,92 @@ +import { Injectable } from '@nestjs/common'; + +import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { + WorkflowRunStatus, + WorkflowRunWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { + WorkflowStatusException, + WorkflowStatusExceptionCode, +} from 'src/modules/workflow/workflow-status/workflow-status.exception'; + +@Injectable() +export class WorkflowStatusWorkspaceService { + constructor(private readonly twentyORMManager: TwentyORMManager) {} + + async createWorkflowRun(workflowVersionId: string, createdBy: ActorMetadata) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + return ( + await workflowRunRepository.save({ + workflowVersionId, + createdBy, + status: WorkflowRunStatus.NOT_STARTED, + }) + ).id; + } + + async startWorkflowRun(workflowRunId: string) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowStatusException( + 'No workflow run to start', + WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + if (workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED) { + throw new WorkflowStatusException( + 'Workflow run already started', + WorkflowStatusExceptionCode.INVALID_OPERATION, + ); + } + + return workflowRunRepository.update(workflowRunToUpdate.id, { + status: WorkflowRunStatus.RUNNING, + startedAt: new Date().toISOString(), + }); + } + + async endWorkflowRun(workflowRunId: string, status: WorkflowRunStatus) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowStatusException( + 'No workflow run to end', + WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + if (workflowRunToUpdate.status !== WorkflowRunStatus.RUNNING) { + throw new WorkflowStatusException( + 'Workflow cannot be ended as it is not running', + WorkflowStatusExceptionCode.INVALID_OPERATION, + ); + } + + return workflowRunRepository.update(workflowRunToUpdate.id, { + status, + endedAt: new Date().toISOString(), + }); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts index 5370d04c1e53..f96e91b14e4f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts @@ -1,14 +1,18 @@ -import { Logger } from '@nestjs/common'; +import { Scope } from '@nestjs/common'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { RunWorkflowJobData, WorkflowRunnerJob, } from 'src/modules/workflow/workflow-runner/workflow-runner.job'; +import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; export type WorkflowEventTriggerJobData = { workspaceId: string; @@ -16,21 +20,44 @@ export type WorkflowEventTriggerJobData = { payload: object; }; -@Processor(MessageQueue.workflowQueue) +@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class WorkflowEventTriggerJob { - private readonly logger = new Logger(WorkflowEventTriggerJob.name); - constructor( @InjectMessageQueue(MessageQueue.workflowQueue) private readonly messageQueueService: MessageQueueService, + private readonly twentyORMManager: TwentyORMManager, + private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService, ) {} @Process(WorkflowEventTriggerJob.name) async handle(data: WorkflowEventTriggerJobData): Promise { + const workflowRepository = + await this.twentyORMManager.getRepository( + 'workflow', + ); + + const workflow = await workflowRepository.findOneByOrFail({ + id: data.workflowId, + }); + + if (!workflow.publishedVersionId) { + throw new Error('Workflow has no published version'); + } + + const workflowRunId = + await this.workflowStatusWorkspaceService.createWorkflowRun( + workflow.publishedVersionId, + { + source: FieldActorSource.WORKFLOW, + name: workflow.name, + }, + ); + this.messageQueueService.add(WorkflowRunnerJob.name, { workspaceId: data.workspaceId, - workflowId: data.workflowId, + workflowVersionId: workflow.publishedVersionId, payload: data.payload, + workflowRunId, }); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts index 8d11ad4cb766..4f03c12fd958 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts @@ -1,10 +1,11 @@ import { Module } from '@nestjs/common'; import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; +import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module'; import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job'; @Module({ - imports: [WorkflowRunnerModule], + imports: [WorkflowRunnerModule, WorkflowStatusModule], providers: [WorkflowEventTriggerJob], }) export class WorkflowTriggerJobModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.exception.ts index 2f4621095065..dd15486857ee 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.exception.ts @@ -12,4 +12,5 @@ export enum WorkflowTriggerExceptionCode { INVALID_WORKFLOW_TRIGGER = 'INVALID_WORKFLOW_TRIGGER', INVALID_WORKFLOW_VERSION = 'INVALID_WORKFLOW_VERSION', INVALID_ACTION_TYPE = 'INVALID_ACTION_TYPE', + INTERNAL_ERROR = 'INTERNAL_ERROR', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts index 94b1720884ff..8cc8bc606609 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts @@ -32,11 +32,18 @@ export class WorkflowTriggerService { workflowVersionId, ); - return await this.workflowRunnerService.run({ - action: workflowVersion.trigger.nextAction, - workspaceId, - payload, - }); + try { + return await this.workflowRunnerService.run({ + action: workflowVersion.trigger.nextAction, + workspaceId, + payload, + }); + } catch (error) { + throw new WorkflowTriggerException( + `Error running workflow version ${error}`, + WorkflowTriggerExceptionCode.INTERNAL_ERROR, + ); + } } async enableWorkflowTrigger(workspaceId: string, workflowVersionId: string) {