diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts index d3f00c95fc42..b81118d3a20f 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts @@ -2,9 +2,10 @@ import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingCleanCacheJob, @@ -20,14 +21,20 @@ export class MessagingMessageImportManagerMessageChannelListener { @OnEvent('messageChannel.deleted') async handleDeletedEvent( - payload: ObjectRecordDeleteEvent, + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, ) { - await this.messageQueueService.add( - MessagingCleanCacheJob.name, - { - workspaceId: payload.workspaceId, - messageChannelId: payload.recordId, - }, + await Promise.all( + payload.events.map((eventPayload) => + this.messageQueueService.add( + MessagingCleanCacheJob.name, + { + workspaceId: payload.workspaceId, + messageChannelId: eventPayload.recordId, + }, + ), + ), ); } } diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-person.listener.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-person.listener.ts index 521c92585054..b815dbeb7496 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-person.listener.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-person.listener.ts @@ -7,13 +7,14 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { - MessageParticipantMatchParticipantJobData, MessageParticipantMatchParticipantJob, + MessageParticipantMatchParticipantJobData, } from 'src/modules/messaging/message-participant-manager/jobs/message-participant-match-participant.job'; import { - MessageParticipantUnmatchParticipantJobData, MessageParticipantUnmatchParticipantJob, + MessageParticipantUnmatchParticipantJobData, } from 'src/modules/messaging/message-participant-manager/jobs/message-participant-unmatch-participant.job'; import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity'; @@ -26,49 +27,57 @@ export class MessageParticipantPersonListener { @OnEvent('person.created') async handleCreatedEvent( - payload: ObjectRecordCreateEvent, - ) { - if (payload.properties.after.email === null) { - return; - } - - await this.messageQueueService.add( - MessageParticipantMatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.after.email, - personId: payload.recordId, - }, - ); - } - - @OnEvent('person.updated') - async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ) { - if ( - objectRecordUpdateEventChangedProperties( - payload.properties.before, - payload.properties.after, - ).includes('email') - ) { - await this.messageQueueService.add( - MessageParticipantUnmatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.before.email, - personId: payload.recordId, - }, - ); + for (const eventPayload of payload.events) { + if (eventPayload.properties.after.email === null) { + return; + } await this.messageQueueService.add( MessageParticipantMatchParticipantJob.name, { workspaceId: payload.workspaceId, - email: payload.properties.after.email, - personId: payload.recordId, + email: eventPayload.properties.after.email, + personId: eventPayload.recordId, }, ); } } + + @OnEvent('person.updated') + async handleUpdatedEvent( + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, + ) { + for (const eventPayload of payload.events) { + if ( + objectRecordUpdateEventChangedProperties( + eventPayload.properties.before, + eventPayload.properties.after, + ).includes('email') + ) { + await this.messageQueueService.add( + MessageParticipantUnmatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.before.email, + personId: eventPayload.recordId, + }, + ); + + await this.messageQueueService.add( + MessageParticipantMatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.after.email, + personId: eventPayload.recordId, + }, + ); + } + } + } } diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-workspace-member.listener.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-workspace-member.listener.ts index 506bf3d5aa18..574594e709f5 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-workspace-member.listener.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/listeners/message-participant-workspace-member.listener.ts @@ -14,6 +14,7 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { MessageParticipantMatchParticipantJob, MessageParticipantMatchParticipantJobData, @@ -35,7 +36,9 @@ export class MessageParticipantWorkspaceMemberListener { @OnEvent('workspaceMember.created') async handleCreatedEvent( - payload: ObjectRecordCreateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ) { const workspace = await this.workspaceRepository.findOneBy({ id: payload.workspaceId, @@ -48,47 +51,53 @@ export class MessageParticipantWorkspaceMemberListener { return; } - if (payload.properties.after.userEmail === null) { - return; - } - - await this.messageQueueService.add( - MessageParticipantMatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.after.userEmail, - workspaceMemberId: payload.properties.after.id, - }, - ); - } - - @OnEvent('workspaceMember.updated') - async handleUpdatedEvent( - payload: ObjectRecordUpdateEvent, - ) { - if ( - objectRecordUpdateEventChangedProperties( - payload.properties.before, - payload.properties.after, - ).includes('userEmail') - ) { - await this.messageQueueService.add( - MessageParticipantUnmatchParticipantJob.name, - { - workspaceId: payload.workspaceId, - email: payload.properties.before.userEmail, - personId: payload.recordId, - }, - ); + for (const eventPayload of payload.events) { + if (eventPayload.properties.after.userEmail === null) { + return; + } await this.messageQueueService.add( MessageParticipantMatchParticipantJob.name, { workspaceId: payload.workspaceId, - email: payload.properties.after.userEmail, - workspaceMemberId: payload.recordId, + email: eventPayload.properties.after.userEmail, + workspaceMemberId: eventPayload.recordId, }, ); } } + + @OnEvent('workspaceMember.updated') + async handleUpdatedEvent( + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, + ) { + for (const eventPayload of payload.events) { + if ( + objectRecordUpdateEventChangedProperties( + eventPayload.properties.before, + eventPayload.properties.after, + ).includes('userEmail') + ) { + await this.messageQueueService.add( + MessageParticipantUnmatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.before.userEmail, + personId: eventPayload.recordId, + }, + ); + + await this.messageQueueService.add( + MessageParticipantMatchParticipantJob.name, + { + workspaceId: payload.workspaceId, + email: eventPayload.properties.after.userEmail, + workspaceMemberId: eventPayload.recordId, + }, + ); + } + } + } }