From 41e438909667168e6396b391f645e62837f26e8e Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Mon, 12 Aug 2024 20:26:25 +0200 Subject: [PATCH 1/6] WIP --- .../constants/standard-field-ids.ts | 8 ++ .../constants/standard-object-ids.ts | 1 + .../workflow-run.workspace-entity.ts | 82 ++++++++++++++ .../workflow-version.workspace-entity.ts | 23 +++- .../workflow-runner/workflow-runner.job.ts | 37 +++---- .../workflow-runner/workflow-runner.module.ts | 11 +- .../workflow-runner.service.ts | 15 ++- .../workflow-status.exception.ts | 13 +++ .../workflow-status/workflow-status.module.ts | 9 ++ .../workflow-status.service.ts | 101 ++++++++++++++++++ .../jobs/workflow-event-trigger.job.ts | 26 ++++- .../jobs/workflow-trigger-job.module.ts | 3 +- 12 files changed, 299 insertions(+), 30 deletions(-) create mode 100644 packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index 12d7b809ed4c..d8b7d070961d 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -411,10 +411,18 @@ export const WORKFLOW_STANDARD_FIELD_IDS = { noteTargets: '20202020-40aa-4839-965e-972a2f72e08d', }; +export const WORKFLOW_RUN_STANDARD_FIELD_IDS = { + workflowVersion: '20202020-2f52-4ba8-8dc4-d0d6adb9578d', + startedAt: '20202020-a234-4e2d-bd15-85bcea6bb183', + endedAt: '20202020-e1c1-4b6b-bbbd-b2beaf2e159e', + status: '20202020-6b3e-4f9c-8c2b-2e5b8e6d6f3b', +}; + export const WORKFLOW_VERSION_STANDARD_FIELD_IDS = { name: '20202020-a12f-4cca-9937-a2e40cc65509', workflow: '20202020-afa3-46c3-91b0-0631ca6aa1c8', trigger: '20202020-4eae-43e7-86e0-212b41a30b48', + runs: '20202020-1d08-46df-901a-85045f18099a', }; export const WORKSPACE_MEMBER_STANDARD_FIELD_IDS = { diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts index 7be15073db4b..7ed3c7bb99d9 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts @@ -41,6 +41,7 @@ export const STANDARD_OBJECT_IDS = { webhook: '20202020-be4d-4e08-811d-0fffcd13ffd4', workflow: '20202020-62be-406c-b9ca-8caa50d51392', workflowEventListener: '20202020-92aa-462f-965c-a785b00e9989', + workflowRun: '20202020-sqf2-4a9d-8b3b-a785b00eeqwe', workflowVersion: '20202020-d65d-4ab9-9344-d77bfb376a3d', workspaceMember: '20202020-3319-4234-a34c-82d5c0e881a6', }; diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts new file mode 100644 index 000000000000..8d658d88a513 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -0,0 +1,82 @@ +import { Relation } from 'src/engine/workspace-manager/workspace-sync-metadata/interfaces/relation.interface'; + +import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; +import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; +import { RelationMetadataType } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; +import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; +import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator'; +import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator'; +import { WorkspaceGate } from 'src/engine/twenty-orm/decorators/workspace-gate.decorator'; +import { WorkspaceIsNullable } from 'src/engine/twenty-orm/decorators/workspace-is-nullable.decorator'; +import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator'; +import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator'; +import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; +import { WORKFLOW_RUN_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; +import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; +import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; + +export enum WorkflowRunStatus { + NOT_STARTED = 'NOT_STARTED', + RUNNING = 'RUNNING', + SUCCEEDED = 'SUCCEEDED', + FAILED = 'FAILED', +} + +@WorkspaceEntity({ + standardId: STANDARD_OBJECT_IDS.workflowRun, + namePlural: 'workflowRuns', + labelSingular: 'workflowRun', + labelPlural: 'WorkflowRuns', + description: 'A workflow run', +}) +@WorkspaceGate({ + featureFlag: FeatureFlagKey.IsWorkflowEnabled, +}) +@WorkspaceIsSystem() +export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.startedAt, + type: FieldMetadataType.DATE_TIME, + label: 'Workflow run started at', + description: 'Workflow run started at', + icon: 'IconHistory', + }) + @WorkspaceIsNullable() + startedAt: string | null; + + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.endedAt, + type: FieldMetadataType.DATE_TIME, + label: 'Workflow run ended at', + description: 'Workflow run ended at', + icon: 'IconHistory', + }) + @WorkspaceIsNullable() + endedAt: string | null; + + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.status, + type: FieldMetadataType.TEXT, + label: 'Workflow run status', + description: 'Workflow run status', + icon: 'IconHistory', + }) + @WorkspaceIsNullable() + status: WorkflowRunStatus; + + // Relations + @WorkspaceRelation({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion, + type: RelationMetadataType.MANY_TO_ONE, + label: 'Workflow', + description: 'WorkflowVersion workflow', + icon: 'IconVersions', + inverseSideTarget: () => WorkflowVersionWorkspaceEntity, + inverseSideFieldKey: 'runs', + }) + @WorkspaceIsNullable() + workflowVersion: Relation; + + @WorkspaceJoinColumn('workflowVersion') + workflowVersionId: string; +} diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts index 3c52fb22f31b..ec8a8206b2e2 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts @@ -2,7 +2,10 @@ import { Relation } from 'src/engine/workspace-manager/workspace-sync-metadata/i import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; -import { RelationMetadataType } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; +import { + RelationMetadataType, + RelationOnDeleteAction, +} from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator'; import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator'; @@ -11,8 +14,12 @@ import { WorkspaceIsNullable } from 'src/engine/twenty-orm/decorators/workspace- import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator'; import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator'; import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; -import { WORKFLOW_VERSION_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; +import { + WORKFLOW_RUN_STANDARD_FIELD_IDS, + WORKFLOW_VERSION_STANDARD_FIELD_IDS, +} from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; +import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type'; @@ -64,4 +71,16 @@ export class WorkflowVersionWorkspaceEntity extends BaseWorkspaceEntity { @WorkspaceJoinColumn('workflow') workflowId: string; + + @WorkspaceRelation({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion, + type: RelationMetadataType.ONE_TO_MANY, + label: 'Runs', + description: 'Workflow runs linked to the version.', + icon: 'IconVersions', + inverseSideTarget: () => WorkflowRunWorkspaceEntity, + onDelete: RelationOnDeleteAction.SET_NULL, + }) + @WorkspaceIsNullable() + runs: Relation; } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts index b2937ffd17c2..1e9e58028932 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts @@ -1,14 +1,14 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services'; import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; +import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; export type RunWorkflowJobData = { workspaceId: string; - workflowId: string; + workflowVersionId: string; payload: object; }; @@ -17,38 +17,35 @@ export class WorkflowRunnerJob { constructor( private readonly workflowCommonService: WorkflowCommonService, private readonly workflowRunnerService: WorkflowRunnerService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly workflowStatusService: WorkflowStatusService, ) {} @Process(WorkflowRunnerJob.name) async handle({ workspaceId, - workflowId, + workflowVersionId, payload, }: RunWorkflowJobData): Promise { - const workflowRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflow', - ); - - const workflow = await workflowRepository.findOneByOrFail({ - id: workflowId, - }); - - if (!workflow.publishedVersionId) { - throw new Error('Workflow has no published version'); - } + await this.workflowStatusService.startWorkflowRun( + workspaceId, + workflowVersionId, + ); const workflowVersion = await this.workflowCommonService.getWorkflowVersion( workspaceId, - workflow.publishedVersionId, + workflowVersionId, ); - await this.workflowRunnerService.run({ + const output = await this.workflowRunnerService.run({ action: workflowVersion.trigger.nextAction, workspaceId, payload, }); + + await this.workflowStatusService.endWorkflowRun( + workspaceId, + workflowVersionId, + output.error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.SUCCEEDED, + ); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts index a5698425057d..cce52e509001 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts @@ -1,12 +1,17 @@ import { Module } from '@nestjs/common'; -import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; -import { WorkflowRunnerJob } from 'src/modules/workflow/workflow-runner/workflow-runner.job'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowActionRunnerModule } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.module'; +import { WorkflowRunnerJob } from 'src/modules/workflow/workflow-runner/workflow-runner.job'; +import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; +import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module'; @Module({ - imports: [WorkflowCommonModule, WorkflowActionRunnerModule], + imports: [ + WorkflowCommonModule, + WorkflowActionRunnerModule, + WorkflowStatusModule, + ], providers: [WorkflowRunnerService, WorkflowRunnerJob], exports: [WorkflowRunnerService], }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts index 9b79d20536b6..15454e4cdf3b 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts @@ -5,6 +5,11 @@ import { WorkflowActionRunnerFactory } from 'src/modules/workflow/workflow-actio const MAX_RETRIES_ON_FAILURE = 3; +export type WorkflowRunOutput = { + data?: object; + error?: object; +}; + @Injectable() export class WorkflowRunnerService { constructor( @@ -21,9 +26,11 @@ export class WorkflowRunnerService { workspaceId: string; payload?: object; attemptCount?: number; - }) { + }): Promise { if (!action) { - return payload; + return { + data: payload, + }; } const workflowActionRunner = this.workflowActionRunnerFactory.get( @@ -68,6 +75,8 @@ export class WorkflowRunnerService { }); } - return result.error; + return { + error: result.error, + }; } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts new file mode 100644 index 000000000000..ac317871bce9 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts @@ -0,0 +1,13 @@ +import { CustomException } from 'src/utils/custom-exception'; + +export class WorkflowStatusException extends CustomException { + code: WorkflowStatusExceptionCode; + constructor(message: string, code: WorkflowStatusExceptionCode) { + super(message, code); + } +} + +export enum WorkflowStatusExceptionCode { + WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND', + WORKFLOW_RUN_ALREADY_STARTED = 'WORKFLOW_RUN_ALREADY_STARTED', +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts new file mode 100644 index 000000000000..c7a3477eb465 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; + +import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; + +@Module({ + providers: [WorkflowStatusService], + exports: [WorkflowStatusService], +}) +export class WorkflowStatusModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts new file mode 100644 index 000000000000..e4886a2219ea --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts @@ -0,0 +1,101 @@ +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { + WorkflowRunStatus, + WorkflowRunWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { + WorkflowStatusException, + WorkflowStatusExceptionCode, +} from 'src/modules/workflow/workflow-status/workflow-status.exception'; + +export class WorkflowStatusService { + constructor( + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + ) {} + + async createWorkflowRun(workspaceId: string, workflowVersionId: string) { + const workflowRunDataSource = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowRun', + ); + + const onGoingWorkflowRuns = ( + await workflowRunDataSource.findBy({ + workflowVersionId, + }) + ).filter((workflow) => + [WorkflowRunStatus.NOT_STARTED, WorkflowRunStatus.RUNNING].includes( + workflow.status, + ), + ); + + if (onGoingWorkflowRuns.length > 0) { + throw new WorkflowStatusException( + 'There is already an on going workflow run', + WorkflowStatusExceptionCode.WORKFLOW_RUN_ALREADY_STARTED, + ); + } + + const workflowRunToCreate = await workflowRunDataSource.create({ + workflowVersionId, + status: WorkflowRunStatus.NOT_STARTED, + }); + + return workflowRunDataSource.save(workflowRunToCreate); + } + + async startWorkflowRun(workspaceId: string, workflowVersionId: string) { + const workflowRunDataSource = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowRun', + ); + + const workflowRunToUpdate = await workflowRunDataSource.findOneBy({ + workflowVersionId, + status: WorkflowRunStatus.NOT_STARTED, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowStatusException( + 'No workflow run to start', + WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + return workflowRunDataSource.update(workflowRunToUpdate.id, { + status: WorkflowRunStatus.RUNNING, + startedAt: new Date().toISOString(), + }); + } + + async endWorkflowRun( + workspaceId: string, + workflowVersionId: string, + status: WorkflowRunStatus, + ) { + const workflowRunDataSource = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowRun', + ); + + const workflowRunToUpdate = await workflowRunDataSource.findOneBy({ + workflowVersionId, + status: WorkflowRunStatus.RUNNING, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowStatusException( + 'No workflow run to end', + WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + return workflowRunDataSource.update(workflowRunToUpdate.id, { + status, + endedAt: new Date().toISOString(), + }); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts index 5370d04c1e53..1fbdc6d6c497 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts @@ -5,10 +5,13 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { RunWorkflowJobData, WorkflowRunnerJob, } from 'src/modules/workflow/workflow-runner/workflow-runner.job'; +import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; export type WorkflowEventTriggerJobData = { workspaceId: string; @@ -23,13 +26,34 @@ export class WorkflowEventTriggerJob { constructor( @InjectMessageQueue(MessageQueue.workflowQueue) private readonly messageQueueService: MessageQueueService, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly workflowStatusService: WorkflowStatusService, ) {} @Process(WorkflowEventTriggerJob.name) async handle(data: WorkflowEventTriggerJobData): Promise { + const workflowRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + data.workspaceId, + 'workflow', + ); + + const workflow = await workflowRepository.findOneByOrFail({ + id: data.workflowId, + }); + + if (!workflow.publishedVersionId) { + throw new Error('Workflow has no published version'); + } + + await this.workflowStatusService.createWorkflowRun( + data.workspaceId, + workflow.publishedVersionId, + ); + this.messageQueueService.add(WorkflowRunnerJob.name, { workspaceId: data.workspaceId, - workflowId: data.workflowId, + workflowVersionId: workflow.publishedVersionId, payload: data.payload, }); } diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts index 8d11ad4cb766..4f03c12fd958 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts @@ -1,10 +1,11 @@ import { Module } from '@nestjs/common'; import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; +import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module'; import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job'; @Module({ - imports: [WorkflowRunnerModule], + imports: [WorkflowRunnerModule, WorkflowStatusModule], providers: [WorkflowEventTriggerJob], }) export class WorkflowTriggerJobModule {} From 899fc3c79004486d0fb19f419f72d868e340eb39 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Wed, 14 Aug 2024 10:59:09 +0200 Subject: [PATCH 2/6] Add workflow run for each workflow triggered --- .../constants/standard-object-ids.ts | 2 +- .../workspace-sync-metadata/standard-objects/index.ts | 2 ++ .../standard-objects/workflow-run.workspace-entity.ts | 2 +- .../workflow/workflow-runner/workflow-runner.job.ts | 2 +- .../workflow/workflow-status/workflow-status.module.ts | 3 +++ .../workflow/workflow-status/workflow-status.service.ts | 3 +++ .../workflow-trigger/jobs/workflow-trigger-job.module.ts | 8 +++++++- 7 files changed, 18 insertions(+), 4 deletions(-) diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts index 7ed3c7bb99d9..a13e78836b57 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts @@ -41,7 +41,7 @@ export const STANDARD_OBJECT_IDS = { webhook: '20202020-be4d-4e08-811d-0fffcd13ffd4', workflow: '20202020-62be-406c-b9ca-8caa50d51392', workflowEventListener: '20202020-92aa-462f-965c-a785b00e9989', - workflowRun: '20202020-sqf2-4a9d-8b3b-a785b00eeqwe', + workflowRun: '20202020-4e28-4e95-a9d7-6c00874f843c', workflowVersion: '20202020-d65d-4ab9-9344-d77bfb376a3d', workspaceMember: '20202020-3319-4234-a34c-82d5c0e881a6', }; diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts index 95c148669293..d510e4f2d2e7 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts @@ -32,6 +32,7 @@ import { ViewSortWorkspaceEntity } from 'src/modules/view/standard-objects/view- import { ViewWorkspaceEntity } from 'src/modules/view/standard-objects/view.workspace-entity'; import { WebhookWorkspaceEntity } from 'src/modules/webhook/standard-objects/webhook.workspace-entity'; import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; +import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @@ -64,6 +65,7 @@ export const standardObjectMetadataDefinitions = [ WorkflowWorkspaceEntity, WorkflowEventListenerWorkspaceEntity, WorkflowVersionWorkspaceEntity, + WorkflowRunWorkspaceEntity, WorkspaceMemberWorkspaceEntity, MessageThreadWorkspaceEntity, MessageThreadSubscriberWorkspaceEntity, diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 8d658d88a513..322ce04ab6ca 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -18,7 +18,7 @@ import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/stan export enum WorkflowRunStatus { NOT_STARTED = 'NOT_STARTED', RUNNING = 'RUNNING', - SUCCEEDED = 'SUCCEEDED', + COMPLETED = 'COMPLETED', FAILED = 'FAILED', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts index 1e9e58028932..e6404d5dd494 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts @@ -45,7 +45,7 @@ export class WorkflowRunnerJob { await this.workflowStatusService.endWorkflowRun( workspaceId, workflowVersionId, - output.error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.SUCCEEDED, + output.error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED, ); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts index c7a3477eb465..5ea9f215d0a9 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -1,8 +1,11 @@ import { Module } from '@nestjs/common'; +import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; +import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; @Module({ + imports: [TwentyORMModule.forFeature([WorkflowRunWorkspaceEntity])], providers: [WorkflowStatusService], exports: [WorkflowStatusService], }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts index e4886a2219ea..2beb368cb737 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts @@ -1,3 +1,5 @@ +import { Injectable } from '@nestjs/common'; + import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkflowRunStatus, @@ -8,6 +10,7 @@ import { WorkflowStatusExceptionCode, } from 'src/modules/workflow/workflow-status/workflow-status.exception'; +@Injectable() export class WorkflowStatusService { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts index 4f03c12fd958..4fb310d85a91 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts @@ -1,11 +1,17 @@ import { Module } from '@nestjs/common'; +import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; +import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module'; import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job'; @Module({ - imports: [WorkflowRunnerModule, WorkflowStatusModule], + imports: [ + WorkflowRunnerModule, + WorkflowStatusModule, + TwentyORMModule.forFeature([WorkflowWorkspaceEntity]), + ], providers: [WorkflowEventTriggerJob], }) export class WorkflowTriggerJobModule {} From e4ed6e166717f0c758b936dd145fb1d0a98aa978 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Wed, 14 Aug 2024 16:15:47 +0200 Subject: [PATCH 3/6] Build a workspace service --- .../composite-types/actor.composite-type.ts | 1 + .../constants/standard-field-ids.ts | 1 + .../workflow-run.workspace-entity.ts | 19 +++- .../workflow-runner/workflow-runner.job.ts | 20 ++-- .../workflow-status.exception.ts | 2 +- .../workflow-status/workflow-status.module.ts | 6 +- .../workflow-status.service.ts | 104 ------------------ .../workflow-status.workspace-service.ts | 92 ++++++++++++++++ .../jobs/workflow-event-trigger.job.ts | 22 ++-- 9 files changed, 139 insertions(+), 128 deletions(-) delete mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts diff --git a/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts b/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts index 69c6d256ab45..1efa0eeffff5 100644 --- a/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts +++ b/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts @@ -8,6 +8,7 @@ import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/fi export enum FieldActorSource { EMAIL = 'EMAIL', CALENDAR = 'CALENDAR', + WORKFLOW = 'WORKFLOW', API = 'API', IMPORT = 'IMPORT', MANUAL = 'MANUAL', diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index d8b7d070961d..5ce6734b97f8 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -416,6 +416,7 @@ export const WORKFLOW_RUN_STANDARD_FIELD_IDS = { startedAt: '20202020-a234-4e2d-bd15-85bcea6bb183', endedAt: '20202020-e1c1-4b6b-bbbd-b2beaf2e159e', status: '20202020-6b3e-4f9c-8c2b-2e5b8e6d6f3b', + createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3', }; export const WORKFLOW_VERSION_STANDARD_FIELD_IDS = { diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 322ce04ab6ca..28b2ae26ea25 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -1,6 +1,10 @@ import { Relation } from 'src/engine/workspace-manager/workspace-sync-metadata/interfaces/relation.interface'; import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; +import { + ActorMetadata, + FieldActorSource, +} from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; import { RelationMetadataType } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; @@ -61,9 +65,21 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { description: 'Workflow run status', icon: 'IconHistory', }) - @WorkspaceIsNullable() status: WorkflowRunStatus; + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.createdBy, + type: FieldMetadataType.ACTOR, + label: 'Created by', + icon: 'IconCreativeCommonsSa', + description: 'The creator of the record', + defaultValue: { + source: `'${FieldActorSource.MANUAL}'`, + name: "''", + }, + }) + createdBy: ActorMetadata; + // Relations @WorkspaceRelation({ standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion, @@ -74,7 +90,6 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { inverseSideTarget: () => WorkflowVersionWorkspaceEntity, inverseSideFieldKey: 'runs', }) - @WorkspaceIsNullable() workflowVersion: Relation; @WorkspaceJoinColumn('workflowVersion') diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts index e6404d5dd494..176418b6b46e 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts @@ -1,35 +1,36 @@ +import { Scope } from '@nestjs/common'; + import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services'; import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; -import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; +import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; export type RunWorkflowJobData = { workspaceId: string; workflowVersionId: string; + workflowRunId: string; payload: object; }; -@Processor(MessageQueue.workflowQueue) +@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class WorkflowRunnerJob { constructor( private readonly workflowCommonService: WorkflowCommonService, private readonly workflowRunnerService: WorkflowRunnerService, - private readonly workflowStatusService: WorkflowStatusService, + private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService, ) {} @Process(WorkflowRunnerJob.name) async handle({ workspaceId, workflowVersionId, + workflowRunId, payload, }: RunWorkflowJobData): Promise { - await this.workflowStatusService.startWorkflowRun( - workspaceId, - workflowVersionId, - ); + await this.workflowStatusWorkspaceService.startWorkflowRun(workflowRunId); const workflowVersion = await this.workflowCommonService.getWorkflowVersion( workspaceId, @@ -42,9 +43,8 @@ export class WorkflowRunnerJob { payload, }); - await this.workflowStatusService.endWorkflowRun( - workspaceId, - workflowVersionId, + await this.workflowStatusWorkspaceService.endWorkflowRun( + workflowRunId, output.error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED, ); } diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts index ac317871bce9..6510815f090c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts @@ -9,5 +9,5 @@ export class WorkflowStatusException extends CustomException { export enum WorkflowStatusExceptionCode { WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND', - WORKFLOW_RUN_ALREADY_STARTED = 'WORKFLOW_RUN_ALREADY_STARTED', + INVALID_OPERATION = 'INVALID_OPERATION', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts index 5ea9f215d0a9..e27b5441df45 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -2,11 +2,11 @@ import { Module } from '@nestjs/common'; import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; -import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; +import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; @Module({ imports: [TwentyORMModule.forFeature([WorkflowRunWorkspaceEntity])], - providers: [WorkflowStatusService], - exports: [WorkflowStatusService], + providers: [WorkflowStatusWorkspaceService], + exports: [WorkflowStatusWorkspaceService], }) export class WorkflowStatusModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts deleted file mode 100644 index 2beb368cb737..000000000000 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts +++ /dev/null @@ -1,104 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { - WorkflowRunStatus, - WorkflowRunWorkspaceEntity, -} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; -import { - WorkflowStatusException, - WorkflowStatusExceptionCode, -} from 'src/modules/workflow/workflow-status/workflow-status.exception'; - -@Injectable() -export class WorkflowStatusService { - constructor( - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, - ) {} - - async createWorkflowRun(workspaceId: string, workflowVersionId: string) { - const workflowRunDataSource = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - ); - - const onGoingWorkflowRuns = ( - await workflowRunDataSource.findBy({ - workflowVersionId, - }) - ).filter((workflow) => - [WorkflowRunStatus.NOT_STARTED, WorkflowRunStatus.RUNNING].includes( - workflow.status, - ), - ); - - if (onGoingWorkflowRuns.length > 0) { - throw new WorkflowStatusException( - 'There is already an on going workflow run', - WorkflowStatusExceptionCode.WORKFLOW_RUN_ALREADY_STARTED, - ); - } - - const workflowRunToCreate = await workflowRunDataSource.create({ - workflowVersionId, - status: WorkflowRunStatus.NOT_STARTED, - }); - - return workflowRunDataSource.save(workflowRunToCreate); - } - - async startWorkflowRun(workspaceId: string, workflowVersionId: string) { - const workflowRunDataSource = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - ); - - const workflowRunToUpdate = await workflowRunDataSource.findOneBy({ - workflowVersionId, - status: WorkflowRunStatus.NOT_STARTED, - }); - - if (!workflowRunToUpdate) { - throw new WorkflowStatusException( - 'No workflow run to start', - WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - - return workflowRunDataSource.update(workflowRunToUpdate.id, { - status: WorkflowRunStatus.RUNNING, - startedAt: new Date().toISOString(), - }); - } - - async endWorkflowRun( - workspaceId: string, - workflowVersionId: string, - status: WorkflowRunStatus, - ) { - const workflowRunDataSource = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - ); - - const workflowRunToUpdate = await workflowRunDataSource.findOneBy({ - workflowVersionId, - status: WorkflowRunStatus.RUNNING, - }); - - if (!workflowRunToUpdate) { - throw new WorkflowStatusException( - 'No workflow run to end', - WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - - return workflowRunDataSource.update(workflowRunToUpdate.id, { - status, - endedAt: new Date().toISOString(), - }); - } -} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts new file mode 100644 index 000000000000..949ae0c27e21 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts @@ -0,0 +1,92 @@ +import { Injectable } from '@nestjs/common'; + +import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { + WorkflowRunStatus, + WorkflowRunWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { + WorkflowStatusException, + WorkflowStatusExceptionCode, +} from 'src/modules/workflow/workflow-status/workflow-status.exception'; + +@Injectable() +export class WorkflowStatusWorkspaceService { + constructor(private readonly twentyORMManager: TwentyORMManager) {} + + async createWorkflowRun(workflowVersionId: string, createdBy: ActorMetadata) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + return ( + await workflowRunRepository.save({ + workflowVersionId, + createdBy, + status: WorkflowRunStatus.NOT_STARTED, + }) + ).id; + } + + async startWorkflowRun(workflowRunId: string) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowStatusException( + 'No workflow run to start', + WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + if (workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED) { + throw new WorkflowStatusException( + 'Workflow run already started', + WorkflowStatusExceptionCode.INVALID_OPERATION, + ); + } + + return workflowRunRepository.update(workflowRunToUpdate.id, { + status: WorkflowRunStatus.RUNNING, + startedAt: new Date().toISOString(), + }); + } + + async endWorkflowRun(workflowRunId: string, status: WorkflowRunStatus) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowStatusException( + 'No workflow run to end', + WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + if (workflowRunToUpdate.status !== WorkflowRunStatus.RUNNING) { + throw new WorkflowStatusException( + 'Workflow cannot be ended as it is not running', + WorkflowStatusExceptionCode.INVALID_OPERATION, + ); + } + + return workflowRunRepository.update(workflowRunToUpdate.id, { + status, + endedAt: new Date().toISOString(), + }); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts index 1fbdc6d6c497..9b901092991b 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts @@ -1,17 +1,18 @@ -import { Logger } from '@nestjs/common'; +import { Logger, Scope } from '@nestjs/common'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { RunWorkflowJobData, WorkflowRunnerJob, } from 'src/modules/workflow/workflow-runner/workflow-runner.job'; -import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; +import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; export type WorkflowEventTriggerJobData = { workspaceId: string; @@ -19,7 +20,7 @@ export type WorkflowEventTriggerJobData = { payload: object; }; -@Processor(MessageQueue.workflowQueue) +@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class WorkflowEventTriggerJob { private readonly logger = new Logger(WorkflowEventTriggerJob.name); @@ -27,7 +28,7 @@ export class WorkflowEventTriggerJob { @InjectMessageQueue(MessageQueue.workflowQueue) private readonly messageQueueService: MessageQueueService, private readonly twentyORMGlobalManager: TwentyORMGlobalManager, - private readonly workflowStatusService: WorkflowStatusService, + private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService, ) {} @Process(WorkflowEventTriggerJob.name) @@ -46,15 +47,20 @@ export class WorkflowEventTriggerJob { throw new Error('Workflow has no published version'); } - await this.workflowStatusService.createWorkflowRun( - data.workspaceId, - workflow.publishedVersionId, - ); + const workflowRunId = + await this.workflowStatusWorkspaceService.createWorkflowRun( + workflow.publishedVersionId, + { + source: FieldActorSource.WORKFLOW, + name: workflow.name, + }, + ); this.messageQueueService.add(WorkflowRunnerJob.name, { workspaceId: data.workspaceId, workflowVersionId: workflow.publishedVersionId, payload: data.payload, + workflowRunId, }); } } From b80e83b6fc723aa1770b1e3995349ae2010cec7f Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Wed, 14 Aug 2024 17:56:37 +0200 Subject: [PATCH 4/6] Throw errors instead of returning --- .../workflow-run.workspace-entity.ts | 2 +- .../workflow-runner.exception.ts | 11 ++++++++ .../workflow-runner/workflow-runner.job.ts | 27 ++++++++++++------- .../workflow-runner.service.ts | 16 ++++++++--- .../workflow-status/workflow-status.module.ts | 3 --- .../jobs/workflow-trigger-job.module.ts | 8 +----- .../workflow-trigger.exception.ts | 1 + .../workflow-trigger.service.ts | 17 ++++++++---- 8 files changed, 56 insertions(+), 29 deletions(-) create mode 100644 packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.exception.ts diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 28b2ae26ea25..6ab23b22c8e9 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -60,7 +60,7 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { @WorkspaceField({ standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.status, - type: FieldMetadataType.TEXT, + type: FieldMetadataType.SELECT, label: 'Workflow run status', description: 'Workflow run status', icon: 'IconHistory', diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.exception.ts new file mode 100644 index 000000000000..1bab0da515ee --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.exception.ts @@ -0,0 +1,11 @@ +import { CustomException } from 'src/utils/custom-exception'; + +export class WorkflowRunnerException extends CustomException { + constructor(message: string, code: string) { + super(message, code); + } +} + +export enum WorkflowRunnerExceptionCode { + WORKFLOW_FAILED = 'WORKFLOW_FAILED', +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts index 176418b6b46e..ab3d13d1a7e8 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts @@ -37,15 +37,24 @@ export class WorkflowRunnerJob { workflowVersionId, ); - const output = await this.workflowRunnerService.run({ - action: workflowVersion.trigger.nextAction, - workspaceId, - payload, - }); + try { + await this.workflowRunnerService.run({ + action: workflowVersion.trigger.nextAction, + workspaceId, + payload, + }); - await this.workflowStatusWorkspaceService.endWorkflowRun( - workflowRunId, - output.error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED, - ); + await this.workflowStatusWorkspaceService.endWorkflowRun( + workflowRunId, + WorkflowRunStatus.COMPLETED, + ); + } catch (error) { + await this.workflowStatusWorkspaceService.endWorkflowRun( + workflowRunId, + WorkflowRunStatus.FAILED, + ); + + throw error; + } } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts index 15454e4cdf3b..e8d37e85dfca 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.service.ts @@ -2,6 +2,10 @@ import { Injectable } from '@nestjs/common'; import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-action.type'; import { WorkflowActionRunnerFactory } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.factory'; +import { + WorkflowRunnerException, + WorkflowRunnerExceptionCode, +} from 'src/modules/workflow/workflow-runner/workflow-runner.exception'; const MAX_RETRIES_ON_FAILURE = 3; @@ -52,7 +56,10 @@ export class WorkflowRunnerService { } if (!result.error) { - throw new Error('Execution result error, no data or error'); + throw new WorkflowRunnerException( + 'Execution result error, no data or error', + WorkflowRunnerExceptionCode.WORKFLOW_FAILED, + ); } if (action.settings.errorHandlingOptions.continueOnFailure.value) { @@ -75,8 +82,9 @@ export class WorkflowRunnerService { }); } - return { - error: result.error, - }; + throw new WorkflowRunnerException( + `Workflow failed: ${result.error}`, + WorkflowRunnerExceptionCode.WORKFLOW_FAILED, + ); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts index e27b5441df45..14eec4fb10c7 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -1,11 +1,8 @@ import { Module } from '@nestjs/common'; -import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; -import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; @Module({ - imports: [TwentyORMModule.forFeature([WorkflowRunWorkspaceEntity])], providers: [WorkflowStatusWorkspaceService], exports: [WorkflowStatusWorkspaceService], }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts index 4fb310d85a91..4f03c12fd958 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts @@ -1,17 +1,11 @@ import { Module } from '@nestjs/common'; -import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; -import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module'; import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job'; @Module({ - imports: [ - WorkflowRunnerModule, - WorkflowStatusModule, - TwentyORMModule.forFeature([WorkflowWorkspaceEntity]), - ], + imports: [WorkflowRunnerModule, WorkflowStatusModule], providers: [WorkflowEventTriggerJob], }) export class WorkflowTriggerJobModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.exception.ts index 2f4621095065..dd15486857ee 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.exception.ts @@ -12,4 +12,5 @@ export enum WorkflowTriggerExceptionCode { INVALID_WORKFLOW_TRIGGER = 'INVALID_WORKFLOW_TRIGGER', INVALID_WORKFLOW_VERSION = 'INVALID_WORKFLOW_VERSION', INVALID_ACTION_TYPE = 'INVALID_ACTION_TYPE', + INTERNAL_ERROR = 'INTERNAL_ERROR', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts index 94b1720884ff..8cc8bc606609 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts @@ -32,11 +32,18 @@ export class WorkflowTriggerService { workflowVersionId, ); - return await this.workflowRunnerService.run({ - action: workflowVersion.trigger.nextAction, - workspaceId, - payload, - }); + try { + return await this.workflowRunnerService.run({ + action: workflowVersion.trigger.nextAction, + workspaceId, + payload, + }); + } catch (error) { + throw new WorkflowTriggerException( + `Error running workflow version ${error}`, + WorkflowTriggerExceptionCode.INTERNAL_ERROR, + ); + } } async enableWorkflowTrigger(workspaceId: string, workflowVersionId: string) { From 4db403a2d1baba6fb661dfa5d9427c7a253b8a7c Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Wed, 14 Aug 2024 18:08:50 +0200 Subject: [PATCH 5/6] Use enum --- .../workflow-run.workspace-entity.ts | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 6ab23b22c8e9..5f78df89164c 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -64,6 +64,33 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { label: 'Workflow run status', description: 'Workflow run status', icon: 'IconHistory', + options: [ + { + value: WorkflowRunStatus.NOT_STARTED, + label: 'Not started', + position: 0, + color: 'grey', + }, + { + value: WorkflowRunStatus.RUNNING, + label: 'Running', + position: 1, + color: 'yellow', + }, + { + value: WorkflowRunStatus.COMPLETED, + label: 'Completed', + position: 2, + color: 'green', + }, + { + value: WorkflowRunStatus.FAILED, + label: 'Failed', + position: 3, + color: 'red', + }, + ], + defaultValue: "'NOT_STARTED'", }) status: WorkflowRunStatus; From 8feb5d17bc14cc6dac46f938106d3ad52ad7a234 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Wed, 14 Aug 2024 18:18:12 +0200 Subject: [PATCH 6/6] Use twenty orm manager --- .../jobs/workflow-event-trigger.job.ts | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts index 9b901092991b..f96e91b14e4f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts @@ -1,4 +1,4 @@ -import { Logger, Scope } from '@nestjs/common'; +import { Scope } from '@nestjs/common'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; @@ -6,7 +6,7 @@ import { Processor } from 'src/engine/integrations/message-queue/decorators/proc import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { RunWorkflowJobData, @@ -22,20 +22,17 @@ export type WorkflowEventTriggerJobData = { @Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class WorkflowEventTriggerJob { - private readonly logger = new Logger(WorkflowEventTriggerJob.name); - constructor( @InjectMessageQueue(MessageQueue.workflowQueue) private readonly messageQueueService: MessageQueueService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly twentyORMManager: TwentyORMManager, private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService, ) {} @Process(WorkflowEventTriggerJob.name) async handle(data: WorkflowEventTriggerJobData): Promise { const workflowRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - data.workspaceId, + await this.twentyORMManager.getRepository( 'workflow', );