diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts index 6c7d6e066d24..1804901ac1d5 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts @@ -8,6 +8,7 @@ import { objectRecordChangedValues } from 'src/engine/integrations/event-emitter import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event'; import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job'; @@ -19,40 +20,46 @@ export class EntityEventsToDbListener { ) {} @OnEvent('*.created') - async handleCreate(payload: ObjectRecordCreateEvent) { + async handleCreate( + payload: WorkspaceEventBatch>, + ) { return this.handle(payload); } @OnEvent('*.updated') - async handleUpdate(payload: ObjectRecordUpdateEvent) { - payload.properties.diff = objectRecordChangedValues( - payload.properties.before, - payload.properties.after, - payload.properties.updatedFields, - payload.objectMetadata, - ); + async handleUpdate( + payload: WorkspaceEventBatch>, + ) { + for (const eventPayload of payload.events) { + eventPayload.properties.diff = objectRecordChangedValues( + eventPayload.properties.before, + eventPayload.properties.after, + eventPayload.properties.updatedFields, + eventPayload.objectMetadata, + ); + } return this.handle(payload); } @OnEvent('*.deleted') - async handleDelete(payload: ObjectRecordUpdateEvent) { + async handleDelete( + payload: WorkspaceEventBatch>, + ) { return this.handle(payload); } - private async handle(payload: ObjectRecordBaseEvent) { - if (!payload.objectMetadata?.isAuditLogged) { - return; - } - - this.messageQueueService.add( - CreateAuditLogFromInternalEvent.name, - payload, + private async handle(payload: WorkspaceEventBatch) { + payload.events = payload.events.filter( + (event) => event.objectMetadata?.isAuditLogged, ); - this.messageQueueService.add( - UpsertTimelineActivityFromInternalEvent.name, - payload, - ); + await this.messageQueueService.add< + WorkspaceEventBatch + >(CreateAuditLogFromInternalEvent.name, payload); + + await this.messageQueueService.add< + WorkspaceEventBatch + >(UpsertTimelineActivityFromInternalEvent.name, payload); } } diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener.ts index 8550a39632b7..f9609794d7ac 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener.ts @@ -4,6 +4,7 @@ import { OnEvent } from '@nestjs/event-emitter'; import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.service'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; @Injectable() export class TelemetryListener { @@ -13,36 +14,48 @@ export class TelemetryListener { ) {} @OnEvent('*.created') - async handleAllCreate(payload: ObjectRecordCreateEvent) { - await this.analyticsService.create( - { - type: 'track', - data: { - eventName: payload.name, - }, - }, - payload.userId, - payload.workspaceId, - '', // voluntarely not retrieving this - '', // to avoid slowing down - this.environmentService.get('SERVER_URL'), + async handleAllCreate( + payload: WorkspaceEventBatch>, + ) { + await Promise.all( + payload.events.map((eventPayload) => + this.analyticsService.create( + { + type: 'track', + data: { + eventName: payload.name, + }, + }, + eventPayload.userId, + payload.workspaceId, + '', // voluntarily not retrieving this + '', // to avoid slowing down + this.environmentService.get('SERVER_URL'), + ), + ), ); } @OnEvent('user.signup') - async handleUserSignup(payload: ObjectRecordCreateEvent) { - await this.analyticsService.create( - { - type: 'track', - data: { - eventName: 'user.signup', - }, - }, - payload.userId, - payload.workspaceId, - '', - '', - this.environmentService.get('SERVER_URL'), + async handleUserSignup( + payload: WorkspaceEventBatch>, + ) { + await Promise.all( + payload.events.map((eventPayload) => + this.analyticsService.create( + { + type: 'track', + data: { + eventName: 'user.signup', + }, + }, + eventPayload.userId, + payload.workspaceId, + '', + '', + this.environmentService.get('SERVER_URL'), + ), + ), ); } } diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.service.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.service.ts index 30f526a345ef..eb91900aea53 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.service.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.service.ts @@ -1,5 +1,4 @@ import { Injectable, Logger } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; import isEmpty from 'lodash.isempty'; import { DataSource } from 'typeorm'; @@ -55,6 +54,8 @@ import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global. import { computeObjectTargetTable } from 'src/engine/utils/compute-object-target-table.util'; import { isQueryTimeoutError } from 'src/engine/utils/query-timeout.util'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; +import { isDefined } from 'src/utils/is-defined'; import { PGGraphQLMutation, @@ -78,7 +79,7 @@ export class WorkspaceQueryRunnerService { private readonly queryResultGettersFactory: QueryResultGettersFactory, @InjectMessageQueue(MessageQueue.webhookQueue) private readonly messageQueueService: MessageQueueService, - private readonly eventEmitter: EventEmitter2, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, private readonly workspaceQueryHookService: WorkspaceQueryHookService, private readonly environmentService: EnvironmentService, private readonly duplicateService: DuplicateService, @@ -304,18 +305,21 @@ export class WorkspaceQueryRunnerService { options, ); - parsedResults.forEach((record) => { - this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.created`, { - name: `${objectMetadataItem.nameSingular}.created`, - workspaceId: authContext.workspace.id, - userId: authContext.user?.id, - recordId: record.id, - objectMetadata: objectMetadataItem, - properties: { - after: record, - }, - } satisfies ObjectRecordCreateEvent); - }); + this.workspaceEventEmitter.emit( + `${objectMetadataItem.nameSingular}.created`, + parsedResults.map( + (record) => + ({ + userId: authContext.user?.id, + recordId: record.id, + objectMetadata: objectMetadataItem, + properties: { + after: record, + }, + }) satisfies ObjectRecordCreateEvent, + ), + authContext.workspace.id, + ); return parsedResults; } @@ -440,18 +444,22 @@ export class WorkspaceQueryRunnerService { options, ); - this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.updated`, { - name: `${objectMetadataItem.nameSingular}.updated`, - workspaceId: authContext.workspace.id, - userId: authContext.user?.id, - recordId: existingRecord.id, - objectMetadata: objectMetadataItem, - properties: { - updatedFields: Object.keys(args.data), - before: this.removeNestedProperties(existingRecord as Record), - after: this.removeNestedProperties(parsedResults?.[0]), - }, - } satisfies ObjectRecordUpdateEvent); + this.workspaceEventEmitter.emit( + `${objectMetadataItem.nameSingular}.updated`, + [ + { + userId: authContext.user?.id, + recordId: existingRecord.id, + objectMetadata: objectMetadataItem, + properties: { + updatedFields: Object.keys(args.data), + before: this.removeNestedProperties(existingRecord as Record), + after: this.removeNestedProperties(parsedResults?.[0]), + }, + } satisfies ObjectRecordUpdateEvent, + ], + authContext.workspace.id, + ); return parsedResults?.[0]; } @@ -513,30 +521,36 @@ export class WorkspaceQueryRunnerService { options, ); - parsedResults.forEach((record) => { - const existingRecord = mappedRecords.get(record.id); + const eventsToEmit: ObjectRecordUpdateEvent[] = parsedResults + .map((record) => { + const existingRecord = mappedRecords.get(record.id); - if (!existingRecord) { - this.logger.warn( - `Record with id ${record.id} not found in the database`, - ); + if (!existingRecord) { + this.logger.warn( + `Record with id ${record.id} not found in the database`, + ); - return; - } + return; + } - this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.updated`, { - name: `${objectMetadataItem.nameSingular}.updated`, - workspaceId: authContext.workspace.id, - userId: authContext.user?.id, - recordId: existingRecord.id, - objectMetadata: objectMetadataItem, - properties: { - updatedFields: Object.keys(args.data), - before: this.removeNestedProperties(existingRecord as Record), - after: this.removeNestedProperties(record), - }, - } satisfies ObjectRecordUpdateEvent); - }); + return { + userId: authContext.user?.id, + recordId: existingRecord.id, + objectMetadata: objectMetadataItem, + properties: { + updatedFields: Object.keys(args.data), + before: this.removeNestedProperties(existingRecord as Record), + after: this.removeNestedProperties(record), + }, + }; + }) + .filter(isDefined); + + this.workspaceEventEmitter.emit( + `${objectMetadataItem.nameSingular}.updated`, + eventsToEmit, + authContext.workspace.id, + ); return parsedResults; } @@ -602,18 +616,21 @@ export class WorkspaceQueryRunnerService { options, ); - parsedResults.forEach((record) => { - this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.deleted`, { - name: `${objectMetadataItem.nameSingular}.deleted`, - workspaceId: authContext.workspace.id, - userId: authContext.user?.id, - recordId: record.id, - objectMetadata: objectMetadataItem, - properties: { - before: this.removeNestedProperties(record), - }, - } satisfies ObjectRecordDeleteEvent); - }); + this.workspaceEventEmitter.emit( + `${objectMetadataItem.nameSingular}.deleted`, + parsedResults.map( + (record) => + ({ + userId: authContext.user?.id, + recordId: record.id, + objectMetadata: objectMetadataItem, + properties: { + before: this.removeNestedProperties(record), + }, + }) satisfies ObjectRecordDeleteEvent, + ), + authContext.workspace.id, + ); return parsedResults; } @@ -744,18 +761,21 @@ export class WorkspaceQueryRunnerService { options, ); - parsedResults.forEach((record) => { - this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.created`, { - name: `${objectMetadataItem.nameSingular}.created`, - workspaceId: authContext.workspace.id, - userId: authContext.user?.id, - recordId: record.id, - objectMetadata: objectMetadataItem, - properties: { - after: this.removeNestedProperties(record), - }, - } satisfies ObjectRecordCreateEvent); - }); + this.workspaceEventEmitter.emit( + `${objectMetadataItem.nameSingular}.created`, + parsedResults.map( + (record) => + ({ + userId: authContext.user?.id, + recordId: record.id, + objectMetadata: objectMetadataItem, + properties: { + after: this.removeNestedProperties(record), + }, + }) satisfies ObjectRecordCreateEvent, + ), + authContext.workspace.id, + ); return parsedResults; } @@ -821,19 +841,23 @@ export class WorkspaceQueryRunnerService { options, ); - this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.deleted`, { - name: `${objectMetadataItem.nameSingular}.deleted`, - workspaceId: authContext.workspace.id, - userId: authContext.user?.id, - recordId: args.id, - objectMetadata: objectMetadataItem, - properties: { - before: { - ...(existingRecord ?? {}), - ...this.removeNestedProperties(parsedResults?.[0]), - }, - }, - } satisfies ObjectRecordDeleteEvent); + this.workspaceEventEmitter.emit( + `${objectMetadataItem.nameSingular}.deleted`, + [ + { + userId: authContext.user?.id, + recordId: args.id, + objectMetadata: objectMetadataItem, + properties: { + before: { + ...(existingRecord ?? {}), + ...this.removeNestedProperties(parsedResults?.[0]), + }, + }, + } satisfies ObjectRecordDeleteEvent, + ], + authContext.workspace.id, + ); return parsedResults?.[0]; } diff --git a/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-workspace-member.listener.ts b/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-workspace-member.listener.ts index d318b7277da7..0d6f03ef49a8 100644 --- a/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-workspace-member.listener.ts +++ b/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-workspace-member.listener.ts @@ -10,6 +10,7 @@ import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/t import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @Injectable() @@ -23,7 +24,9 @@ export class BillingWorkspaceMemberListener { @OnEvent('workspaceMember.created') @OnEvent('workspaceMember.deleted') async handleCreateOrDeleteEvent( - payload: ObjectRecordCreateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ) { if (!this.environmentService.get('IS_BILLING_ENABLED')) { return; diff --git a/packages/twenty-server/src/engine/core-modules/core-engine.module.ts b/packages/twenty-server/src/engine/core-modules/core-engine.module.ts index 11e07c91da4d..37b9ac882052 100644 --- a/packages/twenty-server/src/engine/core-modules/core-engine.module.ts +++ b/packages/twenty-server/src/engine/core-modules/core-engine.module.ts @@ -13,6 +13,7 @@ import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-cred import { UserModule } from 'src/engine/core-modules/user/user.module'; import { WorkflowTriggerCoreModule } from 'src/engine/core-modules/workflow/core-workflow-trigger.module'; import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module'; +import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module'; import { AnalyticsModule } from './analytics/analytics.module'; import { ClientConfigModule } from './client-config/client-config.module'; @@ -36,6 +37,7 @@ import { FileModule } from './file/file.module'; AISQLQueryModule, PostgresCredentialsModule, WorkflowTriggerCoreModule, + WorkspaceEventEmitterModule, ], exports: [ AnalyticsModule, diff --git a/packages/twenty-server/src/engine/core-modules/user-workspace/user-workspace.service.ts b/packages/twenty-server/src/engine/core-modules/user-workspace/user-workspace.service.ts index 671977148d15..cc2727b3a118 100644 --- a/packages/twenty-server/src/engine/core-modules/user-workspace/user-workspace.service.ts +++ b/packages/twenty-server/src/engine/core-modules/user-workspace/user-workspace.service.ts @@ -1,5 +1,4 @@ /* eslint-disable @nx/workspace-inject-workspace-repository */ -import { EventEmitter2 } from '@nestjs/event-emitter'; import { InjectRepository } from '@nestjs/typeorm'; import { TypeOrmQueryService } from '@ptc-org/nestjs-query-typeorm'; @@ -11,6 +10,7 @@ import { User } from 'src/engine/core-modules/user/user.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; import { assert } from 'src/utils/assert'; @@ -22,7 +22,7 @@ export class UserWorkspaceService extends TypeOrmQueryService { private readonly userRepository: Repository, private readonly dataSourceService: DataSourceService, private readonly typeORMService: TypeORMService, - private eventEmitter: EventEmitter2, + private workspaceEventEmitter: WorkspaceEventEmitter, ) { super(userWorkspaceRepository); } @@ -35,11 +35,9 @@ export class UserWorkspaceService extends TypeOrmQueryService { const payload = new ObjectRecordCreateEvent(); - payload.workspaceId = workspaceId; payload.userId = userId; - payload.name = 'user.signup'; - this.eventEmitter.emit('user.signup', payload); + this.workspaceEventEmitter.emit('user.signup', [payload], workspaceId); return this.userWorkspaceRepository.save(userWorkspace); } @@ -76,14 +74,16 @@ export class UserWorkspaceService extends TypeOrmQueryService { const payload = new ObjectRecordCreateEvent(); - payload.workspaceId = workspaceId; payload.properties = { after: workspaceMember[0], }; payload.recordId = workspaceMember[0].id; - payload.name = 'workspaceMember.created'; - this.eventEmitter.emit('workspaceMember.created', payload); + this.workspaceEventEmitter.emit( + 'workspaceMember.created', + [payload], + workspaceId, + ); } async addUserToWorkspace(user: User, workspace: Workspace) { diff --git a/packages/twenty-server/src/engine/core-modules/user/services/__tests__/user.service.spec.ts b/packages/twenty-server/src/engine/core-modules/user/services/__tests__/user.service.spec.ts index 1773f249c7e4..24b5b23bfed9 100644 --- a/packages/twenty-server/src/engine/core-modules/user/services/__tests__/user.service.spec.ts +++ b/packages/twenty-server/src/engine/core-modules/user/services/__tests__/user.service.spec.ts @@ -1,14 +1,14 @@ import { Test, TestingModule } from '@nestjs/testing'; import { getRepositoryToken } from '@nestjs/typeorm'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { UserService } from 'src/engine/core-modules/user/services/user.service'; -import { User } from 'src/engine/core-modules/user/user.entity'; -import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { UserWorkspace } from 'src/engine/core-modules/user-workspace/user-workspace.entity'; +import { UserService } from 'src/engine/core-modules/user/services/user.service'; +import { User } from 'src/engine/core-modules/user/user.entity'; import { WorkspaceService } from 'src/engine/core-modules/workspace/services/workspace.service'; +import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; describe('UserService', () => { let service: UserService; @@ -34,7 +34,7 @@ describe('UserService', () => { useValue: {}, }, { - provide: EventEmitter2, + provide: WorkspaceEventEmitter, useValue: {}, }, { diff --git a/packages/twenty-server/src/engine/core-modules/user/services/user.service.ts b/packages/twenty-server/src/engine/core-modules/user/services/user.service.ts index e8aef7fe899b..3ee3636ee270 100644 --- a/packages/twenty-server/src/engine/core-modules/user/services/user.service.ts +++ b/packages/twenty-server/src/engine/core-modules/user/services/user.service.ts @@ -1,4 +1,3 @@ -import { EventEmitter2 } from '@nestjs/event-emitter'; import { InjectRepository } from '@nestjs/typeorm'; import assert from 'assert'; @@ -16,6 +15,7 @@ import { import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; // eslint-disable-next-line @nx/workspace-inject-workspace-repository @@ -25,7 +25,7 @@ export class UserService extends TypeOrmQueryService { private readonly userRepository: Repository, private readonly dataSourceService: DataSourceService, private readonly typeORMService: TypeORMService, - private readonly eventEmitter: EventEmitter2, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, private readonly workspaceService: WorkspaceService, private readonly twentyORMGlobalManager: TwentyORMGlobalManager, ) { @@ -110,15 +110,16 @@ export class UserService extends TypeOrmQueryService { const payload = new ObjectRecordDeleteEvent(); - payload.workspaceId = workspaceId; payload.properties = { before: workspaceMember, }; - payload.name = 'workspaceMember.deleted'; payload.recordId = workspaceMember.id; - payload.name = 'workspaceMember.deleted'; - this.eventEmitter.emit('workspaceMember.deleted', payload); + this.workspaceEventEmitter.emit( + 'workspaceMember.deleted', + [payload], + workspaceId, + ); return user; } diff --git a/packages/twenty-server/src/engine/core-modules/workspace/workspace-workspace-member.listener.ts b/packages/twenty-server/src/engine/core-modules/workspace/workspace-workspace-member.listener.ts index 4236e3a3fb79..fbc01935d467 100644 --- a/packages/twenty-server/src/engine/core-modules/workspace/workspace-workspace-member.listener.ts +++ b/packages/twenty-server/src/engine/core-modules/workspace/workspace-workspace-member.listener.ts @@ -11,6 +11,7 @@ import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/t import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @Injectable() @@ -23,39 +24,51 @@ export class WorkspaceWorkspaceMemberListener { @OnEvent('workspaceMember.updated') async handleUpdateEvent( - payload: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, ) { - const { firstName: firstNameAfter, lastName: lastNameAfter } = - payload.properties.after.name; - - if (firstNameAfter === '' && lastNameAfter === '') { - return; - } - - if (!payload.userId) { - return; - } - - await this.onboardingService.setOnboardingCreateProfilePending({ - userId: payload.userId, - workspaceId: payload.workspaceId, - value: false, - }); + await Promise.all( + payload.events.map((eventPayload) => { + const { firstName: firstNameAfter, lastName: lastNameAfter } = + eventPayload.properties.after.name; + + if (firstNameAfter === '' && lastNameAfter === '') { + return; + } + + if (!eventPayload.userId) { + return; + } + + return this.onboardingService.setOnboardingCreateProfilePending({ + userId: eventPayload.userId, + workspaceId: payload.workspaceId, + value: false, + }); + }), + ); } @OnEvent('workspaceMember.deleted') async handleDeleteEvent( - payload: ObjectRecordDeleteEvent, + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, ) { - const userId = payload.properties.before.userId; + await Promise.all( + payload.events.map((eventPayload) => { + const userId = eventPayload.properties.before.userId; - if (!userId) { - return; - } + if (!userId) { + return; + } - await this.messageQueueService.add( - HandleWorkspaceMemberDeletedJob.name, - { workspaceId: payload.workspaceId, userId }, + return this.messageQueueService.add( + HandleWorkspaceMemberDeletedJob.name, + { workspaceId: payload.workspaceId, userId }, + ); + }), ); } } diff --git a/packages/twenty-server/src/engine/integrations/event-emitter/types/object-record-job-data.ts b/packages/twenty-server/src/engine/integrations/event-emitter/types/object-record-job-data.ts deleted file mode 100644 index d1f7aa5ec0ef..000000000000 --- a/packages/twenty-server/src/engine/integrations/event-emitter/types/object-record-job-data.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event'; - -export class ObjectRecordJobData extends ObjectRecordBaseEvent { - getOperation() { - return this.name.split('.')[1]; - } - - getObjectName() { - return this.name.split('.')[0]; - } -} diff --git a/packages/twenty-server/src/engine/integrations/event-emitter/types/object-record.base.event.ts b/packages/twenty-server/src/engine/integrations/event-emitter/types/object-record.base.event.ts index d34fbd1afcc5..e515c130928f 100644 --- a/packages/twenty-server/src/engine/integrations/event-emitter/types/object-record.base.event.ts +++ b/packages/twenty-server/src/engine/integrations/event-emitter/types/object-record.base.event.ts @@ -1,11 +1,14 @@ import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface'; export class ObjectRecordBaseEvent { - name: string; - workspaceId: string; recordId: string; userId?: string; workspaceMemberId?: string; objectMetadata: ObjectMetadataInterface; properties: any; } + +export class ObjectRecordBaseEventWithNameAndWorkspaceId extends ObjectRecordBaseEvent { + name: string; + workspaceId: string; +} diff --git a/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.module.ts b/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.module.ts new file mode 100644 index 000000000000..f81fe0fee4ad --- /dev/null +++ b/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.module.ts @@ -0,0 +1,11 @@ +import { Global, Module } from '@nestjs/common'; + +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; + +@Global() +@Module({ + imports: [], + providers: [WorkspaceEventEmitter], + exports: [WorkspaceEventEmitter], +}) +export class WorkspaceEventEmitterModule {} diff --git a/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts b/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts new file mode 100644 index 000000000000..374bc07c54cb --- /dev/null +++ b/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts @@ -0,0 +1,21 @@ +import { Injectable } from '@nestjs/common'; +import { EventEmitter2 } from '@nestjs/event-emitter'; + +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; + +@Injectable() +export class WorkspaceEventEmitter { + constructor(private readonly eventEmitter: EventEmitter2) {} + + public emit(eventName: string, events: any[], workspaceId: string) { + if (!events.length) { + return; + } + + return this.eventEmitter.emit(eventName, { + name: eventName, + workspaceId, + events, + } satisfies WorkspaceEventBatch); + } +} diff --git a/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event.type.ts b/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event.type.ts new file mode 100644 index 000000000000..623c523f280c --- /dev/null +++ b/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event.type.ts @@ -0,0 +1,5 @@ +export type WorkspaceEventBatch = { + name: string; + workspaceId: string; + events: WorkspaceEvent[]; +}; diff --git a/packages/twenty-server/src/modules/calendar/blocklist-manager/listeners/calendar-blocklist.listener.ts b/packages/twenty-server/src/modules/calendar/blocklist-manager/listeners/calendar-blocklist.listener.ts index 9c1c54c5fab3..7a2b7e0e8389 100644 --- a/packages/twenty-server/src/modules/calendar/blocklist-manager/listeners/calendar-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/calendar/blocklist-manager/listeners/calendar-blocklist.listener.ts @@ -7,6 +7,7 @@ import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/t import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { BlocklistItemDeleteCalendarEventsJob, @@ -26,48 +27,74 @@ export class CalendarBlocklistListener { @OnEvent('blocklist.created') async handleCreatedEvent( - payload: ObjectRecordCreateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ) { - await this.messageQueueService.add( - BlocklistItemDeleteCalendarEventsJob.name, - { - workspaceId: payload.workspaceId, - blocklistItemId: payload.recordId, - }, + await Promise.all( + payload.events.map((eventPayload) => + this.messageQueueService.add( + BlocklistItemDeleteCalendarEventsJob.name, + { + workspaceId: payload.workspaceId, + blocklistItemId: eventPayload.recordId, + }, + ), + ), ); } @OnEvent('blocklist.deleted') async handleDeletedEvent( - payload: ObjectRecordDeleteEvent, + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, ) { - await this.messageQueueService.add( - BlocklistReimportCalendarEventsJob.name, - { - workspaceId: payload.workspaceId, - workspaceMemberId: payload.properties.before.workspaceMember.id, - }, + await Promise.all( + payload.events.map((eventPayload) => + this.messageQueueService.add( + BlocklistReimportCalendarEventsJob.name, + { + workspaceId: payload.workspaceId, + workspaceMemberId: + eventPayload.properties.before.workspaceMember.id, + }, + ), + ), ); } @OnEvent('blocklist.updated') async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, ) { - await this.messageQueueService.add( - BlocklistItemDeleteCalendarEventsJob.name, - { - workspaceId: payload.workspaceId, - blocklistItemId: payload.recordId, - }, - ); + await Promise.all( + payload.events.reduce((acc: Promise[], eventPayload) => { + acc.push( + this.messageQueueService.add( + BlocklistItemDeleteCalendarEventsJob.name, + { + workspaceId: payload.workspaceId, + blocklistItemId: eventPayload.recordId, + }, + ), + ); + + acc.push( + this.messageQueueService.add( + BlocklistReimportCalendarEventsJob.name, + { + workspaceId: payload.workspaceId, + workspaceMemberId: + eventPayload.properties.after.workspaceMember.id, + }, + ), + ); - await this.messageQueueService.add( - BlocklistReimportCalendarEventsJob.name, - { - workspaceId: payload.workspaceId, - workspaceMemberId: payload.properties.after.workspaceMember.id, - }, + return acc; + }, []), ); } } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module.ts b/packages/twenty-server/src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module.ts index 50da2766148f..bb9af960eb44 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module.ts @@ -1,15 +1,15 @@ import { Module } from '@nestjs/common'; -import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; import { DeleteConnectedAccountAssociatedCalendarDataJob } from 'src/modules/calendar/calendar-event-cleaner/jobs/delete-connected-account-associated-calendar-data.job'; +import { CalendarEventCleanerConnectedAccountListener } from 'src/modules/calendar/calendar-event-cleaner/listeners/calendar-event-cleaner-connected-account.listener'; import { CalendarEventCleanerService } from 'src/modules/calendar/calendar-event-cleaner/services/calendar-event-cleaner.service'; -import { CalendarEventWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-event.workspace-entity'; @Module({ - imports: [TwentyORMModule.forFeature([CalendarEventWorkspaceEntity])], + imports: [], providers: [ CalendarEventCleanerService, DeleteConnectedAccountAssociatedCalendarDataJob, + CalendarEventCleanerConnectedAccountListener, ], exports: [CalendarEventCleanerService], }) diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-cleaner/listeners/calendar-event-cleaner-connected-account.listener.ts b/packages/twenty-server/src/modules/calendar/calendar-event-cleaner/listeners/calendar-event-cleaner-connected-account.listener.ts new file mode 100644 index 000000000000..0c14b6b60655 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-cleaner/listeners/calendar-event-cleaner-connected-account.listener.ts @@ -0,0 +1,40 @@ +import { Injectable } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; + +import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; +import { + DeleteConnectedAccountAssociatedCalendarDataJob, + DeleteConnectedAccountAssociatedCalendarDataJobData, +} from 'src/modules/calendar/calendar-event-cleaner/jobs/delete-connected-account-associated-calendar-data.job'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; + +@Injectable() +export class CalendarEventCleanerConnectedAccountListener { + constructor( + @InjectMessageQueue(MessageQueue.calendarQueue) + private readonly calendarQueueService: MessageQueueService, + ) {} + + @OnEvent('connectedAccount.deleted') + async handleDeletedEvent( + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, + ) { + await Promise.all( + payload.events.map((eventPayload) => + this.calendarQueueService.add( + DeleteConnectedAccountAssociatedCalendarDataJob.name, + { + workspaceId: payload.workspaceId, + connectedAccountId: eventPayload.recordId, + }, + ), + ), + ); + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service.ts index e67d2c498394..3f22e0c781ac 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service.ts @@ -1,12 +1,13 @@ import { Injectable } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; import { Any } from 'typeorm'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { injectIdsInCalendarEvents } from 'src/modules/calendar/calendar-event-import-manager/utils/inject-ids-in-calendar-events.util'; import { CalendarEventParticipantService } from 'src/modules/calendar/calendar-event-participant-manager/services/calendar-event-participant.service'; import { CalendarChannelEventAssociationWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel-event-association.workspace-entity'; @@ -19,7 +20,6 @@ import { CreateCompanyAndContactJob, CreateCompanyAndContactJobData, } from 'src/modules/contact-creation-manager/jobs/create-company-and-contact.job'; -import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; @Injectable() export class CalendarSaveEventsService { @@ -28,7 +28,7 @@ export class CalendarSaveEventsService { private readonly calendarEventParticipantService: CalendarEventParticipantService, @InjectMessageQueue(MessageQueue.contactCreationQueue) private readonly messageQueueService: MessageQueueService, - private readonly eventEmitter: EventEmitter2, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, ) {} public async saveCalendarEventsAndEnqueueContactCreationJob( @@ -140,13 +140,6 @@ export class CalendarSaveEventsService { ); }); - this.eventEmitter.emit(`calendarEventParticipant.matched`, { - workspaceId, - name: 'calendarEventParticipant.matched', - workspaceMemberId: connectedAccount.accountOwnerId, - calendarEventParticipants: savedCalendarEventParticipantsToEmit, - }); - if (calendarChannel.isContactAutoCreationEnabled) { await this.messageQueueService.add( CreateCompanyAndContactJob.name, diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant-person.listener.ts b/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant-person.listener.ts index 7b369af572d6..a48b842b94cd 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant-person.listener.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant-person.listener.ts @@ -7,13 +7,14 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { - CalendarEventParticipantMatchParticipantJobData, CalendarEventParticipantMatchParticipantJob, + CalendarEventParticipantMatchParticipantJobData, } from 'src/modules/calendar/calendar-event-participant-manager/jobs/calendar-event-participant-match-participant.job'; import { - CalendarEventParticipantUnmatchParticipantJobData, CalendarEventParticipantUnmatchParticipantJob, + CalendarEventParticipantUnmatchParticipantJobData, } from 'src/modules/calendar/calendar-event-participant-manager/jobs/calendar-event-participant-unmatch-participant.job'; import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity'; @@ -26,49 +27,59 @@ export class CalendarEventParticipantPersonListener { @OnEvent('person.created') async handleCreatedEvent( - payload: ObjectRecordCreateEvent, - ) { - if (payload.properties.after.email === null) { - return; - } - - await this.messageQueueService.add( - CalendarEventParticipantMatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.after.email, - personId: payload.recordId, - }, - ); - } - - @OnEvent('person.updated') - async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ) { - if ( - objectRecordUpdateEventChangedProperties( - payload.properties.before, - payload.properties.after, - ).includes('email') - ) { - await this.messageQueueService.add( - CalendarEventParticipantUnmatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.before.email, - personId: payload.recordId, - }, - ); + for (const eventPayload of payload.events) { + if (eventPayload.properties.after.email === null) { + continue; + } + // TODO: modify this job to take an array of participants to match await this.messageQueueService.add( CalendarEventParticipantMatchParticipantJob.name, { workspaceId: payload.workspaceId, - email: payload.properties.after.email, - personId: payload.recordId, + email: eventPayload.properties.after.email, + personId: eventPayload.recordId, }, ); } } + + @OnEvent('person.updated') + async handleUpdatedEvent( + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, + ) { + for (const eventPayload of payload.events) { + if ( + objectRecordUpdateEventChangedProperties( + eventPayload.properties.before, + eventPayload.properties.after, + ).includes('email') + ) { + // TODO: modify this job to take an array of participants to match + await this.messageQueueService.add( + CalendarEventParticipantUnmatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.before.email, + personId: eventPayload.recordId, + }, + ); + + await this.messageQueueService.add( + CalendarEventParticipantMatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.after.email, + personId: eventPayload.recordId, + }, + ); + } + } + } } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant-workspace-member.listener.ts b/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant-workspace-member.listener.ts index 1f84fce0dc2f..ab02392c52c5 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant-workspace-member.listener.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant-workspace-member.listener.ts @@ -7,13 +7,14 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { CalendarEventParticipantMatchParticipantJob, CalendarEventParticipantMatchParticipantJobData, } from 'src/modules/calendar/calendar-event-participant-manager/jobs/calendar-event-participant-match-participant.job'; import { - CalendarEventParticipantUnmatchParticipantJobData, CalendarEventParticipantUnmatchParticipantJob, + CalendarEventParticipantUnmatchParticipantJobData, } from 'src/modules/calendar/calendar-event-participant-manager/jobs/calendar-event-participant-unmatch-participant.job'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @@ -26,49 +27,57 @@ export class CalendarEventParticipantWorkspaceMemberListener { @OnEvent('workspaceMember.created') async handleCreatedEvent( - payload: ObjectRecordCreateEvent, - ) { - if (payload.properties.after.userEmail === null) { - return; - } - - await this.messageQueueService.add( - CalendarEventParticipantMatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.after.userEmail, - workspaceMemberId: payload.properties.after.id, - }, - ); - } - - @OnEvent('workspaceMember.updated') - async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ) { - if ( - objectRecordUpdateEventChangedProperties( - payload.properties.before, - payload.properties.after, - ).includes('userEmail') - ) { - await this.messageQueueService.add( - CalendarEventParticipantUnmatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.before.userEmail, - personId: payload.recordId, - }, - ); + for (const eventPayload of payload.events) { + if (eventPayload.properties.after.userEmail === null) { + continue; + } await this.messageQueueService.add( CalendarEventParticipantMatchParticipantJob.name, { workspaceId: payload.workspaceId, - email: payload.properties.after.userEmail, - workspaceMemberId: payload.recordId, + email: eventPayload.properties.after.userEmail, + workspaceMemberId: eventPayload.recordId, }, ); } } + + @OnEvent('workspaceMember.updated') + async handleUpdatedEvent( + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, + ) { + for (const eventPayload of payload.events) { + if ( + objectRecordUpdateEventChangedProperties( + eventPayload.properties.before, + eventPayload.properties.after, + ).includes('userEmail') + ) { + await this.messageQueueService.add( + CalendarEventParticipantUnmatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.before.userEmail, + personId: eventPayload.recordId, + }, + ); + + await this.messageQueueService.add( + CalendarEventParticipantMatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.after.userEmail, + workspaceMemberId: eventPayload.recordId, + }, + ); + } + } + } } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant.listener.ts b/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant.listener.ts index d75d400ecee6..72152cdc36f2 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant.listener.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/listeners/calendar-event-participant.listener.ts @@ -7,6 +7,7 @@ import { Repository } from 'typeorm'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { CalendarEventParticipantWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-event-participant.workspace-entity'; import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository'; import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity'; @@ -22,49 +23,55 @@ export class CalendarEventParticipantListener { ) {} @OnEvent('calendarEventParticipant.matched') - public async handleCalendarEventParticipantMatchedEvent(payload: { - workspaceId: string; - workspaceMemberId: string; - participants: CalendarEventParticipantWorkspaceEntity[]; - }): Promise { - const calendarEventParticipants = payload.participants ?? []; + public async handleCalendarEventParticipantMatchedEvent( + payload: WorkspaceEventBatch<{ + workspaceMemberId: string; + participants: CalendarEventParticipantWorkspaceEntity[]; + }>, + ): Promise { + const workspaceId = payload.workspaceId; - // TODO: move to a job? + // TODO: Refactor to insertTimelineActivitiesForObject once + for (const eventPayload of payload.events) { + const calendarEventParticipants = eventPayload.participants; + const workspaceMemberId = eventPayload.workspaceMemberId; - const dataSourceSchema = this.workspaceDataSourceService.getSchemaName( - payload.workspaceId, - ); + // TODO: move to a job? - const calendarEventObjectMetadata = - await this.objectMetadataRepository.findOneOrFail({ - where: { - nameSingular: 'calendarEvent', - workspaceId: payload.workspaceId, - }, - }); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - const calendarEventParticipantsWithPersonId = - calendarEventParticipants.filter((participant) => participant.personId); + const calendarEventObjectMetadata = + await this.objectMetadataRepository.findOneOrFail({ + where: { + nameSingular: 'calendarEvent', + workspaceId, + }, + }); - if (calendarEventParticipantsWithPersonId.length === 0) { - return; - } + const calendarEventParticipantsWithPersonId = + calendarEventParticipants.filter((participant) => participant.personId); + + if (calendarEventParticipantsWithPersonId.length === 0) { + continue; + } - await this.timelineActivityRepository.insertTimelineActivitiesForObject( - 'person', - calendarEventParticipantsWithPersonId.map((participant) => ({ - dataSourceSchema, - name: 'calendarEvent.linked', - properties: null, - objectName: 'calendarEvent', - recordId: participant.personId, - workspaceMemberId: payload.workspaceMemberId, - workspaceId: payload.workspaceId, - linkedObjectMetadataId: calendarEventObjectMetadata.id, - linkedRecordId: participant.calendarEventId, - linkedRecordCachedName: '', - })), - payload.workspaceId, - ); + await this.timelineActivityRepository.insertTimelineActivitiesForObject( + 'person', + calendarEventParticipantsWithPersonId.map((participant) => ({ + dataSourceSchema, + name: 'calendarEvent.linked', + properties: null, + objectName: 'calendarEvent', + recordId: participant.personId, + workspaceMemberId, + workspaceId, + linkedObjectMetadataId: calendarEventObjectMetadata.id, + linkedRecordId: participant.calendarEventId, + linkedRecordCachedName: '', + })), + workspaceId, + ); + } } } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/services/calendar-event-participant.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/services/calendar-event-participant.service.ts index a704e6d29295..cc71ebf8282a 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/services/calendar-event-participant.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-participant-manager/services/calendar-event-participant.service.ts @@ -111,7 +111,7 @@ export class CalendarEventParticipantService { await this.matchParticipantService.matchParticipants( savedParticipants, - 'messageParticipant', + 'calendarEventParticipant', transactionManager, ); } diff --git a/packages/twenty-server/src/modules/connected-account/listeners/connected-account.listener.ts b/packages/twenty-server/src/modules/connected-account/listeners/connected-account.listener.ts index 0c9b78843ce1..455721d817f6 100644 --- a/packages/twenty-server/src/modules/connected-account/listeners/connected-account.listener.ts +++ b/packages/twenty-server/src/modules/connected-account/listeners/connected-account.listener.ts @@ -3,6 +3,7 @@ import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @@ -16,27 +17,31 @@ export class ConnectedAccountListener { @OnEvent('connectedAccount.deleted') async handleDeletedEvent( - payload: ObjectRecordDeleteEvent, + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, ) { - const workspaceMemberId = payload.properties.before.accountOwnerId; - const workspaceId = payload.workspaceId; - const workspaceMemberRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workspaceMember', - ); - const workspaceMember = await workspaceMemberRepository.findOneOrFail({ - where: { id: workspaceMemberId }, - }); + for (const eventPayload of payload.events) { + const workspaceMemberId = eventPayload.properties.before.accountOwnerId; + const workspaceId = payload.workspaceId; + const workspaceMemberRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workspaceMember', + ); + const workspaceMember = await workspaceMemberRepository.findOneOrFail({ + where: { id: workspaceMemberId }, + }); - const userId = workspaceMember.userId; + const userId = workspaceMember.userId; - const connectedAccountId = payload.properties.before.id; + const connectedAccountId = eventPayload.properties.before.id; - await this.accountsToReconnectService.removeAccountToReconnect( - userId, - workspaceId, - connectedAccountId, - ); + await this.accountsToReconnectService.removeAccountToReconnect( + userId, + workspaceId, + connectedAccountId, + ); + } } } diff --git a/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-delete-one.pre-query.hook.ts b/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-delete-one.pre-query.hook.ts index 7a730431acf7..0812c0e33ea0 100644 --- a/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-delete-one.pre-query.hook.ts +++ b/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-delete-one.pre-query.hook.ts @@ -1,5 +1,3 @@ -import { EventEmitter2 } from '@nestjs/event-emitter'; - import { WorkspaceQueryHookInstance } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/interfaces/workspace-query-hook.interface'; import { DeleteOneResolverArgs } from 'src/engine/api/graphql/workspace-resolver-builder/interfaces/workspace-resolvers-builder.interface'; @@ -7,6 +5,7 @@ import { WorkspaceQueryHook } from 'src/engine/api/graphql/workspace-query-runne import { AuthContext } from 'src/engine/core-modules/auth/types/auth-context.type'; import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @WorkspaceQueryHook(`connectedAccount.deleteOne`) @@ -15,7 +14,7 @@ export class ConnectedAccountDeleteOnePreQueryHook { constructor( private readonly twentyORMManager: TwentyORMManager, - private eventEmitter: EventEmitter2, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, ) {} async execute( @@ -34,16 +33,19 @@ export class ConnectedAccountDeleteOnePreQueryHook connectedAccountId, }); - messageChannels.forEach((messageChannel) => { - this.eventEmitter.emit('messageChannel.deleted', { - workspaceId: authContext.workspace.id, - name: 'messageChannel.deleted', - recordId: messageChannel.id, - } satisfies Pick< - ObjectRecordDeleteEvent, - 'workspaceId' | 'recordId' | 'name' - >); - }); + this.workspaceEventEmitter.emit( + 'messageChannel.deleted', + messageChannels.map( + (messageChannel) => + ({ + recordId: messageChannel.id, + }) satisfies Pick< + ObjectRecordDeleteEvent, + 'recordId' + >, + ), + authContext.workspace.id, + ); return payload; } diff --git a/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-query-hook.module.ts b/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-query-hook.module.ts index 57aaa6408be0..225a1c3bebbf 100644 --- a/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-query-hook.module.ts +++ b/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-query-hook.module.ts @@ -1,11 +1,9 @@ import { Module } from '@nestjs/common'; -import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; import { ConnectedAccountDeleteOnePreQueryHook } from 'src/modules/connected-account/query-hooks/connected-account-delete-one.pre-query.hook'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @Module({ - imports: [TwentyORMModule.forFeature([MessageChannelWorkspaceEntity])], + imports: [], providers: [ConnectedAccountDeleteOnePreQueryHook], }) export class ConnectedAccountQueryHookModule {} diff --git a/packages/twenty-server/src/modules/contact-creation-manager/listeners/auto-companies-and-contacts-creation-calendar-channel.listener.ts b/packages/twenty-server/src/modules/contact-creation-manager/listeners/auto-companies-and-contacts-creation-calendar-channel.listener.ts index 51104de273c4..5dd7497d009b 100644 --- a/packages/twenty-server/src/modules/contact-creation-manager/listeners/auto-companies-and-contacts-creation-calendar-channel.listener.ts +++ b/packages/twenty-server/src/modules/contact-creation-manager/listeners/auto-companies-and-contacts-creation-calendar-channel.listener.ts @@ -6,9 +6,10 @@ import { objectRecordChangedProperties } from 'src/engine/integrations/event-emi import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { - CalendarCreateCompanyAndContactAfterSyncJobData, CalendarCreateCompanyAndContactAfterSyncJob, + CalendarCreateCompanyAndContactAfterSyncJobData, } from 'src/modules/calendar/calendar-event-participant-manager/jobs/calendar-create-company-and-contact-after-sync.job'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -21,22 +22,28 @@ export class AutoCompaniesAndContactsCreationCalendarChannelListener { @OnEvent('calendarChannel.updated') async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, ) { - if ( - objectRecordChangedProperties( - payload.properties.before, - payload.properties.after, - ).includes('isContactAutoCreationEnabled') && - payload.properties.after.isContactAutoCreationEnabled - ) { - await this.messageQueueService.add( - CalendarCreateCompanyAndContactAfterSyncJob.name, - { - workspaceId: payload.workspaceId, - calendarChannelId: payload.recordId, - }, - ); - } + await Promise.all( + payload.events.map((eventPayload) => { + if ( + objectRecordChangedProperties( + eventPayload.properties.before, + eventPayload.properties.after, + ).includes('isContactAutoCreationEnabled') && + eventPayload.properties.after.isContactAutoCreationEnabled + ) { + return this.messageQueueService.add( + CalendarCreateCompanyAndContactAfterSyncJob.name, + { + workspaceId: payload.workspaceId, + calendarChannelId: eventPayload.recordId, + }, + ); + } + }), + ); } } diff --git a/packages/twenty-server/src/modules/contact-creation-manager/listeners/auto-companies-and-contacts-creation-message-channel.listener.ts b/packages/twenty-server/src/modules/contact-creation-manager/listeners/auto-companies-and-contacts-creation-message-channel.listener.ts index f1d6362d45a9..ac2511f745c0 100644 --- a/packages/twenty-server/src/modules/contact-creation-manager/listeners/auto-companies-and-contacts-creation-message-channel.listener.ts +++ b/packages/twenty-server/src/modules/contact-creation-manager/listeners/auto-companies-and-contacts-creation-message-channel.listener.ts @@ -6,10 +6,11 @@ import { objectRecordChangedProperties } from 'src/engine/integrations/event-emi import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { - MessagingCreateCompanyAndContactAfterSyncJobData, MessagingCreateCompanyAndContactAfterSyncJob, + MessagingCreateCompanyAndContactAfterSyncJobData, } from 'src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job'; @Injectable() @@ -21,22 +22,28 @@ export class AutoCompaniesAndContactsCreationMessageChannelListener { @OnEvent('messageChannel.updated') async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, ) { - if ( - objectRecordChangedProperties( - payload.properties.before, - payload.properties.after, - ).includes('isContactAutoCreationEnabled') && - payload.properties.after.isContactAutoCreationEnabled - ) { - await this.messageQueueService.add( - MessagingCreateCompanyAndContactAfterSyncJob.name, - { - workspaceId: payload.workspaceId, - messageChannelId: payload.recordId, - }, - ); - } + await Promise.all( + payload.events.map((eventPayload) => { + if ( + objectRecordChangedProperties( + eventPayload.properties.before, + eventPayload.properties.after, + ).includes('isContactAutoCreationEnabled') && + eventPayload.properties.after.isContactAutoCreationEnabled + ) { + return this.messageQueueService.add( + MessagingCreateCompanyAndContactAfterSyncJob.name, + { + workspaceId: payload.workspaceId, + messageChannelId: eventPayload.recordId, + }, + ); + } + }), + ); } } diff --git a/packages/twenty-server/src/modules/contact-creation-manager/services/create-company-and-contact.service.ts b/packages/twenty-server/src/modules/contact-creation-manager/services/create-company-and-contact.service.ts index 6435f0d65327..db6a6328bb29 100644 --- a/packages/twenty-server/src/modules/contact-creation-manager/services/create-company-and-contact.service.ts +++ b/packages/twenty-server/src/modules/contact-creation-manager/services/create-company-and-contact.service.ts @@ -1,5 +1,4 @@ import { Injectable } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; import { InjectRepository } from '@nestjs/typeorm'; import chunk from 'lodash.chunk'; @@ -11,6 +10,7 @@ import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/com import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { CONTACTS_CREATION_BATCH_SIZE } from 'src/modules/contact-creation-manager/constants/contacts-creation-batch-size.constant'; @@ -32,7 +32,7 @@ export class CreateCompanyAndContactService { private readonly createCompaniesService: CreateCompanyService, @InjectObjectMetadataRepository(WorkspaceMemberWorkspaceEntity) private readonly workspaceMemberRepository: WorkspaceMemberRepository, - private readonly eventEmitter: EventEmitter2, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, @InjectRepository(ObjectMetadataEntity, 'metadata') private readonly objectMetadataRepository: Repository, private readonly twentyORMGlobalManager: TwentyORMGlobalManager, @@ -191,18 +191,21 @@ export class CreateCompanyAndContactService { source, ); - for (const createdPerson of createdPeople) { - this.eventEmitter.emit('person.created', { - name: 'person.created', - workspaceId, - // FixMe: TypeORM typing issue... id is always returned when using save - recordId: createdPerson.id as string, - objectMetadata, - properties: { - after: createdPerson, - }, - } satisfies ObjectRecordCreateEvent); - } + this.workspaceEventEmitter.emit( + 'person.created', + createdPeople.map( + (createdPerson) => + ({ + // FixMe: TypeORM typing issue... id is always returned when using save + recordId: createdPerson.id as string, + objectMetadata, + properties: { + after: createdPerson, + }, + }) satisfies ObjectRecordCreateEvent, + ), + workspaceId, + ); } } } diff --git a/packages/twenty-server/src/modules/match-participant/match-participant.service.ts b/packages/twenty-server/src/modules/match-participant/match-participant.service.ts index d7bba2e4b8e2..c5cd06f301f9 100644 --- a/packages/twenty-server/src/modules/match-participant/match-participant.service.ts +++ b/packages/twenty-server/src/modules/match-participant/match-participant.service.ts @@ -1,10 +1,10 @@ import { Injectable } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; import { Any } from 'typeorm'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { CalendarEventParticipantWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-event-participant.workspace-entity'; import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity'; @@ -17,7 +17,7 @@ export class MatchParticipantService< | MessageParticipantWorkspaceEntity, > { constructor( - private readonly eventEmitter: EventEmitter2, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, private readonly twentyORMManager: TwentyORMManager, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, ) {} @@ -46,6 +46,10 @@ export class MatchParticipantService< const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + if (!workspaceId) { + throw new Error('Workspace ID is required'); + } + const participantIds = participants.map((participant) => participant.id); const uniqueParticipantsHandles = [ ...new Set(participants.map((participant) => participant.handle)), @@ -109,11 +113,16 @@ export class MatchParticipantService< transactionManager, ); - this.eventEmitter.emit(`${objectMetadataName}.matched`, { + this.workspaceEventEmitter.emit( + `${objectMetadataName}.matched`, + [ + { + workspaceMemberId: null, + participants: matchedParticipants, + }, + ], workspaceId, - workspaceMemberId: null, - participants: matchedParticipants, - }); + ); } public async matchParticipantsAfterPersonOrWorkspaceMemberCreation( @@ -127,6 +136,10 @@ export class MatchParticipantService< const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + if (!workspaceId) { + throw new Error('Workspace ID is required'); + } + const participantsToUpdate = await participantRepository.find({ where: { handle, @@ -155,12 +168,18 @@ export class MatchParticipantService< }, }); - this.eventEmitter.emit(`${objectMetadataName}.matched`, { + this.workspaceEventEmitter.emit( + `${objectMetadataName}.matched`, + [ + { + workspaceId, + name: `${objectMetadataName}.matched`, + workspaceMemberId: null, + participants: updatedParticipants, + }, + ], workspaceId, - name: `${objectMetadataName}.matched`, - workspaceMemberId: null, - participants: updatedParticipants, - }); + ); } if (workspaceMemberId) { diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts index 996aca092466..c34bbe192f12 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts @@ -9,6 +9,7 @@ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queu import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @@ -32,90 +33,109 @@ export class MessagingBlocklistListener { @OnEvent('blocklist.created') async handleCreatedEvent( - payload: ObjectRecordCreateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ) { - await this.messageQueueService.add( - BlocklistItemDeleteMessagesJob.name, - { - workspaceId: payload.workspaceId, - blocklistItemId: payload.recordId, - }, + await Promise.all( + payload.events.map((eventPayload) => + // TODO: modify to pass an array of blocklist items + this.messageQueueService.add( + BlocklistItemDeleteMessagesJob.name, + { + workspaceId: payload.workspaceId, + blocklistItemId: eventPayload.recordId, + }, + ), + ), ); } @OnEvent('blocklist.deleted') async handleDeletedEvent( - payload: ObjectRecordDeleteEvent, + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, ) { - const workspaceMemberId = payload.properties.before.workspaceMember.id; const workspaceId = payload.workspaceId; - const connectedAccount = - await this.connectedAccountRepository.getAllByWorkspaceMemberId( - workspaceMemberId, + for (const eventPayload of payload.events) { + const workspaceMemberId = + eventPayload.properties.before.workspaceMember.id; + + const connectedAccount = + await this.connectedAccountRepository.getAllByWorkspaceMemberId( + workspaceMemberId, + workspaceId, + ); + + if (!connectedAccount || connectedAccount.length === 0) { + return; + } + + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOneOrFail({ + where: { + connectedAccountId: connectedAccount[0].id, + }, + }); + + await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + messageChannel.id, workspaceId, ); - - if (!connectedAccount || connectedAccount.length === 0) { - return; } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - const messageChannel = await messageChannelRepository.findOneOrFail({ - where: { - connectedAccountId: connectedAccount[0].id, - }, - }); - - await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel.id, - workspaceId, - ); } @OnEvent('blocklist.updated') async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, ) { - const workspaceMemberId = payload.properties.before.workspaceMember.id; const workspaceId = payload.workspaceId; - await this.messageQueueService.add( - BlocklistItemDeleteMessagesJob.name, - { - workspaceId, - blocklistItemId: payload.recordId, - }, - ); + for (const eventPayload of payload.events) { + const workspaceMemberId = + eventPayload.properties.before.workspaceMember.id; - const connectedAccount = - await this.connectedAccountRepository.getAllByWorkspaceMemberId( - workspaceMemberId, - workspaceId, + await this.messageQueueService.add( + BlocklistItemDeleteMessagesJob.name, + { + workspaceId, + blocklistItemId: eventPayload.recordId, + }, ); - if (!connectedAccount || connectedAccount.length === 0) { - return; - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', + const connectedAccount = + await this.connectedAccountRepository.getAllByWorkspaceMemberId( + workspaceMemberId, + workspaceId, + ); + + if (!connectedAccount || connectedAccount.length === 0) { + continue; + } + + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOneOrFail({ + where: { + connectedAccountId: connectedAccount[0].id, + }, + }); + + await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + messageChannel.id, + workspaceId, ); - - const messageChannel = await messageChannelRepository.findOneOrFail({ - where: { - connectedAccountId: connectedAccount[0].id, - }, - }); - - await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel.id, - workspaceId, - ); + } } } diff --git a/packages/twenty-server/src/modules/messaging/message-cleaner/listeners/messaging-message-cleaner-connected-account.listener.ts b/packages/twenty-server/src/modules/messaging/message-cleaner/listeners/messaging-message-cleaner-connected-account.listener.ts index 8880ad5d10d4..8caf65c58857 100644 --- a/packages/twenty-server/src/modules/messaging/message-cleaner/listeners/messaging-message-cleaner-connected-account.listener.ts +++ b/packages/twenty-server/src/modules/messaging/message-cleaner/listeners/messaging-message-cleaner-connected-account.listener.ts @@ -2,46 +2,39 @@ import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessagingConnectedAccountDeletionCleanupJob, MessagingConnectedAccountDeletionCleanupJobData, } from 'src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job'; -import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; -import { - DeleteConnectedAccountAssociatedCalendarDataJobData, - DeleteConnectedAccountAssociatedCalendarDataJob, -} from 'src/modules/calendar/calendar-event-cleaner/jobs/delete-connected-account-associated-calendar-data.job'; @Injectable() export class MessagingMessageCleanerConnectedAccountListener { constructor( @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - @InjectMessageQueue(MessageQueue.calendarQueue) - private readonly calendarQueueService: MessageQueueService, ) {} @OnEvent('connectedAccount.deleted') async handleDeletedEvent( - payload: ObjectRecordDeleteEvent, + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, ) { - await this.messageQueueService.add( - MessagingConnectedAccountDeletionCleanupJob.name, - { - workspaceId: payload.workspaceId, - connectedAccountId: payload.recordId, - }, - ); - - await this.calendarQueueService.add( - DeleteConnectedAccountAssociatedCalendarDataJob.name, - { - workspaceId: payload.workspaceId, - connectedAccountId: payload.recordId, - }, + await Promise.all( + payload.events.map((eventPayload) => + this.messageQueueService.add( + MessagingConnectedAccountDeletionCleanupJob.name, + { + workspaceId: payload.workspaceId, + connectedAccountId: eventPayload.recordId, + }, + ), + ), ); } } diff --git a/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts b/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts index ae4221a85e85..7b25fa456ae3 100644 --- a/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts @@ -1,19 +1,11 @@ import { Module } from '@nestjs/common'; -import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; -import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; -import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; import { MessagingConnectedAccountDeletionCleanupJob } from 'src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job'; import { MessagingMessageCleanerConnectedAccountListener } from 'src/modules/messaging/message-cleaner/listeners/messaging-message-cleaner-connected-account.listener'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; @Module({ - imports: [ - TwentyORMModule.forFeature([ - MessageWorkspaceEntity, - MessageThreadWorkspaceEntity, - ]), - ], + imports: [], providers: [ MessagingMessageCleanerService, MessagingConnectedAccountDeletionCleanupJob, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts index d3f00c95fc42..b81118d3a20f 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts @@ -2,9 +2,10 @@ import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingCleanCacheJob, @@ -20,14 +21,20 @@ export class MessagingMessageImportManagerMessageChannelListener { @OnEvent('messageChannel.deleted') async handleDeletedEvent( - payload: ObjectRecordDeleteEvent, + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, ) { - await this.messageQueueService.add( - MessagingCleanCacheJob.name, - { - workspaceId: payload.workspaceId, - messageChannelId: payload.recordId, - }, + await Promise.all( + payload.events.map((eventPayload) => + this.messageQueueService.add( + MessagingCleanCacheJob.name, + { + workspaceId: payload.workspaceId, + messageChannelId: eventPayload.recordId, + }, + ), + ), ); } } diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-person.listener.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-person.listener.ts index 521c92585054..2765c661129d 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-person.listener.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-person.listener.ts @@ -7,13 +7,14 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { - MessageParticipantMatchParticipantJobData, MessageParticipantMatchParticipantJob, + MessageParticipantMatchParticipantJobData, } from 'src/modules/messaging/message-participant-manager/jobs/message-participant-match-participant.job'; import { - MessageParticipantUnmatchParticipantJobData, MessageParticipantUnmatchParticipantJob, + MessageParticipantUnmatchParticipantJobData, } from 'src/modules/messaging/message-participant-manager/jobs/message-participant-unmatch-participant.job'; import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity'; @@ -26,49 +27,57 @@ export class MessageParticipantPersonListener { @OnEvent('person.created') async handleCreatedEvent( - payload: ObjectRecordCreateEvent, - ) { - if (payload.properties.after.email === null) { - return; - } - - await this.messageQueueService.add( - MessageParticipantMatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.after.email, - personId: payload.recordId, - }, - ); - } - - @OnEvent('person.updated') - async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ) { - if ( - objectRecordUpdateEventChangedProperties( - payload.properties.before, - payload.properties.after, - ).includes('email') - ) { - await this.messageQueueService.add( - MessageParticipantUnmatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.before.email, - personId: payload.recordId, - }, - ); + for (const eventPayload of payload.events) { + if (eventPayload.properties.after.email === null) { + continue; + } await this.messageQueueService.add( MessageParticipantMatchParticipantJob.name, { workspaceId: payload.workspaceId, - email: payload.properties.after.email, - personId: payload.recordId, + email: eventPayload.properties.after.email, + personId: eventPayload.recordId, }, ); } } + + @OnEvent('person.updated') + async handleUpdatedEvent( + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, + ) { + for (const eventPayload of payload.events) { + if ( + objectRecordUpdateEventChangedProperties( + eventPayload.properties.before, + eventPayload.properties.after, + ).includes('email') + ) { + await this.messageQueueService.add( + MessageParticipantUnmatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.before.email, + personId: eventPayload.recordId, + }, + ); + + await this.messageQueueService.add( + MessageParticipantMatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.after.email, + personId: eventPayload.recordId, + }, + ); + } + } + } } diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-workspace-member.listener.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-workspace-member.listener.ts index 506bf3d5aa18..07968f379cec 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-workspace-member.listener.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-workspace-member.listener.ts @@ -14,6 +14,7 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { MessageParticipantMatchParticipantJob, MessageParticipantMatchParticipantJobData, @@ -35,7 +36,9 @@ export class MessageParticipantWorkspaceMemberListener { @OnEvent('workspaceMember.created') async handleCreatedEvent( - payload: ObjectRecordCreateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ) { const workspace = await this.workspaceRepository.findOneBy({ id: payload.workspaceId, @@ -48,47 +51,53 @@ export class MessageParticipantWorkspaceMemberListener { return; } - if (payload.properties.after.userEmail === null) { - return; - } - - await this.messageQueueService.add( - MessageParticipantMatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.after.userEmail, - workspaceMemberId: payload.properties.after.id, - }, - ); - } - - @OnEvent('workspaceMember.updated') - async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, - ) { - if ( - objectRecordUpdateEventChangedProperties( - payload.properties.before, - payload.properties.after, - ).includes('userEmail') - ) { - await this.messageQueueService.add( - MessageParticipantUnmatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.before.userEmail, - personId: payload.recordId, - }, - ); + for (const eventPayload of payload.events) { + if (eventPayload.properties.after.userEmail === null) { + continue; + } await this.messageQueueService.add( MessageParticipantMatchParticipantJob.name, { workspaceId: payload.workspaceId, - email: payload.properties.after.userEmail, - workspaceMemberId: payload.recordId, + email: eventPayload.properties.after.userEmail, + workspaceMemberId: eventPayload.recordId, }, ); } } + + @OnEvent('workspaceMember.updated') + async handleUpdatedEvent( + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, + ) { + for (const eventPayload of payload.events) { + if ( + objectRecordUpdateEventChangedProperties( + eventPayload.properties.before, + eventPayload.properties.after, + ).includes('userEmail') + ) { + await this.messageQueueService.add( + MessageParticipantUnmatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.before.userEmail, + personId: eventPayload.recordId, + }, + ); + + await this.messageQueueService.add( + MessageParticipantMatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.after.userEmail, + workspaceMemberId: eventPayload.recordId, + }, + ); + } + } + } } diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant.listener.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant.listener.ts index f7a21ae898b3..d57ebdc4d513 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant.listener.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant.listener.ts @@ -7,6 +7,7 @@ import { Repository } from 'typeorm'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository'; import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity'; @@ -22,50 +23,54 @@ export class MessageParticipantListener { ) {} @OnEvent('messageParticipant.matched') - public async handleMessageParticipantMatched(payload: { - workspaceId: string; - workspaceMemberId: string; - participants: MessageParticipantWorkspaceEntity[]; - }): Promise { - const messageParticipants = payload.participants ?? []; + public async handleMessageParticipantMatched( + payload: WorkspaceEventBatch<{ + workspaceMemberId: string; + participants: MessageParticipantWorkspaceEntity[]; + }>, + ): Promise { + // TODO: Refactor to insertTimelineActivitiesForObject once + for (const eventPayload of payload.events) { + const messageParticipants = eventPayload.participants ?? []; - // TODO: move to a job? + // TODO: move to a job? - const dataSourceSchema = this.workspaceDataSourceService.getSchemaName( - payload.workspaceId, - ); + const dataSourceSchema = this.workspaceDataSourceService.getSchemaName( + payload.workspaceId, + ); - const messageObjectMetadata = - await this.objectMetadataRepository.findOneOrFail({ - where: { - nameSingular: 'message', - workspaceId: payload.workspaceId, - }, - }); + const messageObjectMetadata = + await this.objectMetadataRepository.findOneOrFail({ + where: { + nameSingular: 'message', + workspaceId: payload.workspaceId, + }, + }); - const messageParticipantsWithPersonId = messageParticipants.filter( - (participant) => participant.personId, - ); + const messageParticipantsWithPersonId = messageParticipants.filter( + (participant) => participant.personId, + ); - if (messageParticipantsWithPersonId.length === 0) { - return; - } + if (messageParticipantsWithPersonId.length === 0) { + return; + } - await this.timelineActivityRepository.insertTimelineActivitiesForObject( - 'person', - messageParticipantsWithPersonId.map((participant) => ({ - dataSourceSchema, - name: 'message.linked', - properties: null, - objectName: 'message', - recordId: participant.personId, - workspaceMemberId: payload.workspaceMemberId, - workspaceId: payload.workspaceId, - linkedObjectMetadataId: messageObjectMetadata.id, - linkedRecordId: participant.messageId, - linkedRecordCachedName: '', - })), - payload.workspaceId, - ); + await this.timelineActivityRepository.insertTimelineActivitiesForObject( + 'person', + messageParticipantsWithPersonId.map((participant) => ({ + dataSourceSchema, + name: 'message.linked', + properties: null, + objectName: 'message', + recordId: participant.personId, + workspaceMemberId: eventPayload.workspaceMemberId, + workspaceId: payload.workspaceId, + linkedObjectMetadataId: messageObjectMetadata.id, + linkedRecordId: participant.messageId, + linkedRecordCachedName: '', + })), + payload.workspaceId, + ); + } } } diff --git a/packages/twenty-server/src/modules/timeline/jobs/create-audit-log-from-internal-event.ts b/packages/twenty-server/src/modules/timeline/jobs/create-audit-log-from-internal-event.ts index edf4c3d9e488..d820020d3efc 100644 --- a/packages/twenty-server/src/modules/timeline/jobs/create-audit-log-from-internal-event.ts +++ b/packages/twenty-server/src/modules/timeline/jobs/create-audit-log-from-internal-event.ts @@ -1,12 +1,13 @@ import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository'; import { AuditLogWorkspaceEntity } from 'src/modules/timeline/standard-objects/audit-log.workspace-entity'; import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; -import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; @Processor(MessageQueue.entityEventsToDbQueue) export class CreateAuditLogFromInternalEvent { @@ -18,33 +19,37 @@ export class CreateAuditLogFromInternalEvent { ) {} @Process(CreateAuditLogFromInternalEvent.name) - async handle(data: ObjectRecordBaseEvent): Promise { - let workspaceMemberId: string | null = null; + async handle( + data: WorkspaceEventBatch, + ): Promise { + for (const eventData of data.events) { + let workspaceMemberId: string | null = null; - if (data.userId) { - const workspaceMember = await this.workspaceMemberService.getByIdOrFail( - data.userId, - data.workspaceId, - ); + if (eventData.userId) { + const workspaceMember = await this.workspaceMemberService.getByIdOrFail( + eventData.userId, + data.workspaceId, + ); - workspaceMemberId = workspaceMember.id; - } + workspaceMemberId = workspaceMember.id; + } - if (data.properties.diff) { - // we remove "before" and "after" property for a cleaner/slimmer event payload - data.properties = { - diff: data.properties.diff, - }; - } + if (eventData.properties.diff) { + // we remove "before" and "after" property for a cleaner/slimmer event payload + eventData.properties = { + diff: eventData.properties.diff, + }; + } - await this.auditLogRepository.insert( - data.name, - data.properties, - workspaceMemberId, - data.name.split('.')[0], - data.objectMetadata.id, - data.recordId, - data.workspaceId, - ); + await this.auditLogRepository.insert( + data.name, + eventData.properties, + workspaceMemberId, + data.name.split('.')[0], + eventData.objectMetadata.id, + eventData.recordId, + data.workspaceId, + ); + } } } diff --git a/packages/twenty-server/src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job.ts b/packages/twenty-server/src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job.ts index ada03bf13c8f..c73479745c7c 100644 --- a/packages/twenty-server/src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job.ts +++ b/packages/twenty-server/src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job.ts @@ -3,6 +3,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { TimelineActivityService } from 'src/modules/timeline/services/timeline-activity.service'; import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @@ -16,33 +17,41 @@ export class UpsertTimelineActivityFromInternalEvent { ) {} @Process(UpsertTimelineActivityFromInternalEvent.name) - async handle(data: ObjectRecordBaseEvent): Promise { - if (data.userId) { - const workspaceMember = await this.workspaceMemberService.getByIdOrFail( - data.userId, - data.workspaceId, - ); + async handle( + data: WorkspaceEventBatch, + ): Promise { + for (const eventData of data.events) { + if (eventData.userId) { + const workspaceMember = await this.workspaceMemberService.getByIdOrFail( + eventData.userId, + data.workspaceId, + ); - data.workspaceMemberId = workspaceMember.id; - } + eventData.workspaceMemberId = workspaceMember.id; + } - if (data.properties.diff) { - // we remove "before" and "after" property for a cleaner/slimmer event payload - data.properties = { - diff: data.properties.diff, - }; - } + if (eventData.properties.diff) { + // we remove "before" and "after" property for a cleaner/slimmer event payload + eventData.properties = { + diff: eventData.properties.diff, + }; + } - // Temporary - // We ignore every that is not a LinkedObject or a Business Object - if ( - data.objectMetadata.isSystem && - data.objectMetadata.nameSingular !== 'noteTarget' && - data.objectMetadata.nameSingular !== 'taskTarget' - ) { - return; - } + // Temporary + // We ignore every that is not a LinkedObject or a Business Object + if ( + eventData.objectMetadata.isSystem && + eventData.objectMetadata.nameSingular !== 'noteTarget' && + eventData.objectMetadata.nameSingular !== 'taskTarget' + ) { + continue; + } - await this.timelineActivityService.upsertEvent(data); + await this.timelineActivityService.upsertEvent({ + ...eventData, + workspaceId: data.workspaceId, + name: data.name, + }); + } } } diff --git a/packages/twenty-server/src/modules/timeline/services/timeline-activity.service.ts b/packages/twenty-server/src/modules/timeline/services/timeline-activity.service.ts index b3b32ef3b522..963a65d5f35f 100644 --- a/packages/twenty-server/src/modules/timeline/services/timeline-activity.service.ts +++ b/packages/twenty-server/src/modules/timeline/services/timeline-activity.service.ts @@ -1,12 +1,12 @@ import { Injectable } from '@nestjs/common'; -import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event'; +import { ObjectRecordBaseEventWithNameAndWorkspaceId } from 'src/engine/integrations/event-emitter/types/object-record.base.event'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository'; import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity'; -type TransformedEvent = ObjectRecordBaseEvent & { +type TransformedEvent = ObjectRecordBaseEventWithNameAndWorkspaceId & { objectName?: string; linkedRecordCachedName?: string; linkedRecordId?: string; @@ -26,7 +26,7 @@ export class TimelineActivityService { task: 'taskTarget', }; - async upsertEvent(event: ObjectRecordBaseEvent) { + async upsertEvent(event: ObjectRecordBaseEventWithNameAndWorkspaceId) { const events = await this.transformEvent(event); if (!events || events.length === 0) return; @@ -47,7 +47,7 @@ export class TimelineActivityService { } private async transformEvent( - event: ObjectRecordBaseEvent, + event: ObjectRecordBaseEventWithNameAndWorkspaceId, ): Promise { if (['note', 'task'].includes(event.objectMetadata.nameSingular)) { const linkedObjects = await this.handleLinkedObjects(event); @@ -69,7 +69,9 @@ export class TimelineActivityService { return [event]; } - private async handleLinkedObjects(event: ObjectRecordBaseEvent) { + private async handleLinkedObjects( + event: ObjectRecordBaseEventWithNameAndWorkspaceId, + ) { const dataSourceSchema = this.workspaceDataSourceService.getSchemaName( event.workspaceId, ); @@ -92,7 +94,7 @@ export class TimelineActivityService { } private async processActivity( - event: ObjectRecordBaseEvent, + event: ObjectRecordBaseEventWithNameAndWorkspaceId, dataSourceSchema: string, activityType: string, ) { @@ -145,7 +147,7 @@ export class TimelineActivityService { } private async processActivityTarget( - event: ObjectRecordBaseEvent, + event: ObjectRecordBaseEventWithNameAndWorkspaceId, dataSourceSchema: string, activityType: string, ) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener.ts index 3bf1eaa7e416..5291b8094ba9 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener.ts @@ -10,6 +10,7 @@ import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decora import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; import { WorkflowEventTriggerJob, @@ -28,25 +29,32 @@ export class DatabaseEventTriggerListener { ) {} @OnEvent('*.created') - async handleObjectRecordCreateEvent(payload: ObjectRecordCreateEvent) { + async handleObjectRecordCreateEvent( + payload: WorkspaceEventBatch>, + ) { await this.handleEvent(payload); } @OnEvent('*.updated') - async handleObjectRecordUpdateEvent(payload: ObjectRecordUpdateEvent) { + async handleObjectRecordUpdateEvent( + payload: WorkspaceEventBatch>, + ) { await this.handleEvent(payload); } @OnEvent('*.deleted') - async handleObjectRecordDeleteEvent(payload: ObjectRecordDeleteEvent) { + async handleObjectRecordDeleteEvent( + payload: WorkspaceEventBatch>, + ) { await this.handleEvent(payload); } private async handleEvent( - payload: + payload: WorkspaceEventBatch< | ObjectRecordCreateEvent | ObjectRecordUpdateEvent - | ObjectRecordDeleteEvent, + | ObjectRecordDeleteEvent + >, ) { const workspaceId = payload.workspaceId; const eventName = payload.name; @@ -84,15 +92,17 @@ export class DatabaseEventTriggerListener { }); for (const eventListener of eventListeners) { - this.messageQueueService.add( - WorkflowEventTriggerJob.name, - { - workspaceId, - workflowId: eventListener.workflowId, - payload, - }, - { retryLimit: 3 }, - ); + for (const eventPayload of payload.events) { + this.messageQueueService.add( + WorkflowEventTriggerJob.name, + { + workspaceId, + workflowId: eventListener.workflowId, + payload: eventPayload, + }, + { retryLimit: 3 }, + ); + } } } } diff --git a/packages/twenty-server/src/queue-worker/queue-worker.module.ts b/packages/twenty-server/src/queue-worker/queue-worker.module.ts index 6f3374a1c79c..c6eb553d2219 100644 --- a/packages/twenty-server/src/queue-worker/queue-worker.module.ts +++ b/packages/twenty-server/src/queue-worker/queue-worker.module.ts @@ -1,15 +1,17 @@ import { Module } from '@nestjs/common'; -import { JobsModule } from 'src/engine/integrations/message-queue/jobs.module'; import { IntegrationsModule } from 'src/engine/integrations/integrations.module'; -import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; +import { JobsModule } from 'src/engine/integrations/message-queue/jobs.module'; import { MessageQueueModule } from 'src/engine/integrations/message-queue/message-queue.module'; +import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; +import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module'; @Module({ imports: [ TwentyORMModule.register({}), IntegrationsModule, MessageQueueModule.registerExplorer(), + WorkspaceEventEmitterModule, JobsModule, ], })