Skip to content

Commit

Permalink
update listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
bosiraphael committed Aug 14, 2024
1 parent df3416f commit 249a2ce
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessagingCleanCacheJob,
Expand All @@ -20,14 +21,20 @@ export class MessagingMessageImportManagerMessageChannelListener {

@OnEvent('messageChannel.deleted')
async handleDeletedEvent(
payload: ObjectRecordDeleteEvent<MessageChannelWorkspaceEntity>,
payload: WorkspaceEventBatch<
ObjectRecordDeleteEvent<MessageChannelWorkspaceEntity>
>,
) {
await this.messageQueueService.add<MessagingCleanCacheJobData>(
MessagingCleanCacheJob.name,
{
workspaceId: payload.workspaceId,
messageChannelId: payload.recordId,
},
await Promise.all(
payload.events.map((eventPayload) =>
this.messageQueueService.add<MessagingCleanCacheJobData>(
MessagingCleanCacheJob.name,
{
workspaceId: payload.workspaceId,
messageChannelId: eventPayload.recordId,
},
),
),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import {
MessageParticipantMatchParticipantJobData,
MessageParticipantMatchParticipantJob,
MessageParticipantMatchParticipantJobData,
} from 'src/modules/messaging/message-participant-manager/jobs/message-participant-match-participant.job';
import {
MessageParticipantUnmatchParticipantJobData,
MessageParticipantUnmatchParticipantJob,
MessageParticipantUnmatchParticipantJobData,
} from 'src/modules/messaging/message-participant-manager/jobs/message-participant-unmatch-participant.job';
import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity';

Expand All @@ -26,49 +27,57 @@ export class MessageParticipantPersonListener {

@OnEvent('person.created')
async handleCreatedEvent(
payload: ObjectRecordCreateEvent<PersonWorkspaceEntity>,
) {
if (payload.properties.after.email === null) {
return;
}

await this.messageQueueService.add<MessageParticipantMatchParticipantJobData>(
MessageParticipantMatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.properties.after.email,
personId: payload.recordId,
},
);
}

@OnEvent('person.updated')
async handleUpdatedEvent(
payload: ObjectRecordUpdateEvent<PersonWorkspaceEntity>,
payload: WorkspaceEventBatch<
ObjectRecordCreateEvent<PersonWorkspaceEntity>
>,
) {
if (
objectRecordUpdateEventChangedProperties(
payload.properties.before,
payload.properties.after,
).includes('email')
) {
await this.messageQueueService.add<MessageParticipantUnmatchParticipantJobData>(
MessageParticipantUnmatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.properties.before.email,
personId: payload.recordId,
},
);
for (const eventPayload of payload.events) {
if (eventPayload.properties.after.email === null) {
return;
}

await this.messageQueueService.add<MessageParticipantMatchParticipantJobData>(
MessageParticipantMatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.properties.after.email,
personId: payload.recordId,
email: eventPayload.properties.after.email,
personId: eventPayload.recordId,
},
);
}
}

@OnEvent('person.updated')
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<PersonWorkspaceEntity>
>,
) {
for (const eventPayload of payload.events) {
if (
objectRecordUpdateEventChangedProperties(
eventPayload.properties.before,
eventPayload.properties.after,
).includes('email')
) {
await this.messageQueueService.add<MessageParticipantUnmatchParticipantJobData>(
MessageParticipantUnmatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: eventPayload.properties.before.email,
personId: eventPayload.recordId,
},
);

await this.messageQueueService.add<MessageParticipantMatchParticipantJobData>(
MessageParticipantMatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: eventPayload.properties.after.email,
personId: eventPayload.recordId,
},
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import {
MessageParticipantMatchParticipantJob,
MessageParticipantMatchParticipantJobData,
Expand All @@ -35,7 +36,9 @@ export class MessageParticipantWorkspaceMemberListener {

@OnEvent('workspaceMember.created')
async handleCreatedEvent(
payload: ObjectRecordCreateEvent<WorkspaceMemberWorkspaceEntity>,
payload: WorkspaceEventBatch<
ObjectRecordCreateEvent<WorkspaceMemberWorkspaceEntity>
>,
) {
const workspace = await this.workspaceRepository.findOneBy({
id: payload.workspaceId,
Expand All @@ -48,47 +51,53 @@ export class MessageParticipantWorkspaceMemberListener {
return;
}

if (payload.properties.after.userEmail === null) {
return;
}

await this.messageQueueService.add<MessageParticipantMatchParticipantJobData>(
MessageParticipantMatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.properties.after.userEmail,
workspaceMemberId: payload.properties.after.id,
},
);
}

@OnEvent('workspaceMember.updated')
async handleUpdatedEvent(
payload: ObjectRecordUpdateEvent<WorkspaceMemberWorkspaceEntity>,
) {
if (
objectRecordUpdateEventChangedProperties<WorkspaceMemberWorkspaceEntity>(
payload.properties.before,
payload.properties.after,
).includes('userEmail')
) {
await this.messageQueueService.add<MessageParticipantUnmatchParticipantJobData>(
MessageParticipantUnmatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.properties.before.userEmail,
personId: payload.recordId,
},
);
for (const eventPayload of payload.events) {
if (eventPayload.properties.after.userEmail === null) {
return;
}

await this.messageQueueService.add<MessageParticipantMatchParticipantJobData>(
MessageParticipantMatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.properties.after.userEmail,
workspaceMemberId: payload.recordId,
email: eventPayload.properties.after.userEmail,
workspaceMemberId: eventPayload.recordId,
},
);
}
}

@OnEvent('workspaceMember.updated')
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<WorkspaceMemberWorkspaceEntity>
>,
) {
for (const eventPayload of payload.events) {
if (
objectRecordUpdateEventChangedProperties<WorkspaceMemberWorkspaceEntity>(
eventPayload.properties.before,
eventPayload.properties.after,
).includes('userEmail')
) {
await this.messageQueueService.add<MessageParticipantUnmatchParticipantJobData>(
MessageParticipantUnmatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: eventPayload.properties.before.userEmail,
personId: eventPayload.recordId,
},
);

await this.messageQueueService.add<MessageParticipantMatchParticipantJobData>(
MessageParticipantMatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: eventPayload.properties.after.userEmail,
workspaceMemberId: eventPayload.recordId,
},
);
}
}
}
}

0 comments on commit 249a2ce

Please sign in to comment.