Skip to content

Commit

Permalink
create workspace event emitter module
Browse files Browse the repository at this point in the history
  • Loading branch information
bosiraphael committed Aug 19, 2024
1 parent 04fe229 commit ba5bbed
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 86 deletions.
16 changes: 8 additions & 8 deletions packages/twenty-server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@ import {
RequestMethod,
} from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ServeStaticModule } from '@nestjs/serve-static';
import { GraphQLModule } from '@nestjs/graphql';
import { ServeStaticModule } from '@nestjs/serve-static';

import { existsSync } from 'fs';
import { join } from 'path';

import { YogaDriverConfig, YogaDriver } from '@graphql-yoga/nestjs';
import { YogaDriver, YogaDriverConfig } from '@graphql-yoga/nestjs';

import { RestApiModule } from 'src/engine/api/rest/rest-api.module';
import { ModulesModule } from 'src/modules/modules.module';
import { CoreGraphQLApiModule } from 'src/engine/api/graphql/core-graphql-api.module';
import { MetadataGraphQLApiModule } from 'src/engine/api/graphql/metadata-graphql-api.module';
import { GraphQLConfigModule } from 'src/engine/api/graphql/graphql-config/graphql-config.module';
import { GraphQLConfigService } from 'src/engine/api/graphql/graphql-config/graphql-config.service';
import { MetadataGraphQLApiModule } from 'src/engine/api/graphql/metadata-graphql-api.module';
import { RestApiModule } from 'src/engine/api/rest/rest-api.module';
import { MessageQueueDriverType } from 'src/engine/integrations/message-queue/interfaces';
import { MessageQueueModule } from 'src/engine/integrations/message-queue/message-queue.module';
import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module';
import { GraphQLHydrateRequestFromTokenMiddleware } from 'src/engine/middlewares/graphql-hydrate-request-from-token.middleware';
import { MessageQueueModule } from 'src/engine/integrations/message-queue/message-queue.module';
import { MessageQueueDriverType } from 'src/engine/integrations/message-queue/interfaces';
import { ModulesModule } from 'src/modules/modules.module';

import { IntegrationsModule } from './engine/integrations/integrations.module';
import { CoreEngineModule } from './engine/core-modules/core-engine.module';
import { IntegrationsModule } from './engine/integrations/integrations.module';

@Module({
imports: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-cred
import { UserModule } from 'src/engine/core-modules/user/user.module';
import { WorkflowTriggerCoreModule } from 'src/engine/core-modules/workflow/core-workflow-trigger.module';
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module';

import { AnalyticsModule } from './analytics/analytics.module';
import { ClientConfigModule } from './client-config/client-config.module';
Expand All @@ -36,6 +37,7 @@ import { FileModule } from './file/file.module';
AISQLQueryModule,
PostgresCredentialsModule,
WorkflowTriggerCoreModule,
WorkspaceEventEmitterModule,
],
exports: [
AnalyticsModule,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { EventEmitter2 } from '@nestjs/event-emitter';

import { UserService } from 'src/engine/core-modules/user/services/user.service';
import { User } from 'src/engine/core-modules/user/user.entity';
import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { UserWorkspace } from 'src/engine/core-modules/user-workspace/user-workspace.entity';
import { UserService } from 'src/engine/core-modules/user/services/user.service';
import { User } from 'src/engine/core-modules/user/user.entity';
import { WorkspaceService } from 'src/engine/core-modules/workspace/services/workspace.service';
import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';

describe('UserService', () => {
let service: UserService;
Expand All @@ -34,7 +34,7 @@ describe('UserService', () => {
useValue: {},
},
{
provide: EventEmitter2,
provide: WorkspaceEventEmitter,
useValue: {},
},
{
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ export class ObjectRecordBaseEvent {
objectMetadata: ObjectMetadataInterface;
properties: any;
}

export class ObjectRecordBaseEventWithNameAndWorkspaceId extends ObjectRecordBaseEvent {
name: string;
workspaceId: string;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Global, Module } from '@nestjs/common';

import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';

@Global()
@Module({
imports: [],
providers: [WorkspaceEventEmitter],
exports: [WorkspaceEventEmitter],
})
export class WorkspaceEventEmitterModule {}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';

import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';

@Injectable()
export class WorkspaceEventEmitter extends EventEmitter2 {
constructor() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class ConnectedAccountDeleteOnePreQueryHook
{
constructor(
private readonly twentyORMManager: TwentyORMManager,
private workspaceEventEmitter: WorkspaceEventEmitter,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
) {}

async execute(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { Module } from '@nestjs/common';

import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module';
import { ConnectedAccountDeleteOnePreQueryHook } from 'src/modules/connected-account/query-hooks/connected-account-delete-one.pre-query.hook';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';

@Module({
imports: [TwentyORMModule.forFeature([MessageChannelWorkspaceEntity])],
imports: [],
providers: [ConnectedAccountDeleteOnePreQueryHook],
})
export class ConnectedAccountQueryHookModule {}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository';
import { AuditLogWorkspaceEntity } from 'src/modules/timeline/standard-objects/audit-log.workspace-entity';
import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';

@Processor(MessageQueue.entityEventsToDbQueue)
export class CreateAuditLogFromInternalEvent {
Expand All @@ -18,33 +19,37 @@ export class CreateAuditLogFromInternalEvent {
) {}

@Process(CreateAuditLogFromInternalEvent.name)
async handle(data: ObjectRecordBaseEvent): Promise<void> {
let workspaceMemberId: string | null = null;
async handle(
data: WorkspaceEventBatch<ObjectRecordBaseEvent>,
): Promise<void> {
for (const eventData of data.events) {
let workspaceMemberId: string | null = null;

if (data.userId) {
const workspaceMember = await this.workspaceMemberService.getByIdOrFail(
data.userId,
data.workspaceId,
);
if (eventData.userId) {
const workspaceMember = await this.workspaceMemberService.getByIdOrFail(
eventData.userId,
data.workspaceId,
);

workspaceMemberId = workspaceMember.id;
}
workspaceMemberId = workspaceMember.id;
}

if (data.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
data.properties = {
diff: data.properties.diff,
};
}
if (eventData.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
eventData.properties = {
diff: eventData.properties.diff,
};
}

await this.auditLogRepository.insert(
data.name,
data.properties,
workspaceMemberId,
data.name.split('.')[0],
data.objectMetadata.id,
data.recordId,
data.workspaceId,
);
await this.auditLogRepository.insert(
data.name,
eventData.properties,
workspaceMemberId,
data.name.split('.')[0],
eventData.objectMetadata.id,
eventData.recordId,
data.workspaceId,
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { TimelineActivityService } from 'src/modules/timeline/services/timeline-activity.service';
import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
Expand All @@ -16,33 +17,41 @@ export class UpsertTimelineActivityFromInternalEvent {
) {}

@Process(UpsertTimelineActivityFromInternalEvent.name)
async handle(data: ObjectRecordBaseEvent): Promise<void> {
if (data.userId) {
const workspaceMember = await this.workspaceMemberService.getByIdOrFail(
data.userId,
data.workspaceId,
);
async handle(
data: WorkspaceEventBatch<ObjectRecordBaseEvent>,
): Promise<void> {
for (const eventData of data.events) {
if (eventData.userId) {
const workspaceMember = await this.workspaceMemberService.getByIdOrFail(
eventData.userId,
data.workspaceId,
);

data.workspaceMemberId = workspaceMember.id;
}
eventData.workspaceMemberId = workspaceMember.id;
}

if (data.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
data.properties = {
diff: data.properties.diff,
};
}
if (eventData.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
eventData.properties = {
diff: eventData.properties.diff,
};
}

// Temporary
// We ignore every that is not a LinkedObject or a Business Object
if (
data.objectMetadata.isSystem &&
data.objectMetadata.nameSingular !== 'noteTarget' &&
data.objectMetadata.nameSingular !== 'taskTarget'
) {
return;
}
// Temporary
// We ignore every that is not a LinkedObject or a Business Object
if (
eventData.objectMetadata.isSystem &&
eventData.objectMetadata.nameSingular !== 'noteTarget' &&
eventData.objectMetadata.nameSingular !== 'taskTarget'
) {
continue;
}

await this.timelineActivityService.upsertEvent(data);
await this.timelineActivityService.upsertEvent({
...eventData,
workspaceId: data.workspaceId,
name: data.name,
});
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Injectable } from '@nestjs/common';

import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event';
import { ObjectRecordBaseEventWithNameAndWorkspaceId } from 'src/engine/integrations/event-emitter/types/object-record.base.event';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository';
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';

type TransformedEvent = ObjectRecordBaseEvent & {
type TransformedEvent = ObjectRecordBaseEventWithNameAndWorkspaceId & {
objectName?: string;
linkedRecordCachedName?: string;
linkedRecordId?: string;
Expand All @@ -26,7 +26,7 @@ export class TimelineActivityService {
task: 'taskTarget',
};

async upsertEvent(event: ObjectRecordBaseEvent) {
async upsertEvent(event: ObjectRecordBaseEventWithNameAndWorkspaceId) {
const events = await this.transformEvent(event);

if (!events || events.length === 0) return;
Expand All @@ -47,7 +47,7 @@ export class TimelineActivityService {
}

private async transformEvent(
event: ObjectRecordBaseEvent,
event: ObjectRecordBaseEventWithNameAndWorkspaceId,
): Promise<TransformedEvent[]> {
if (['note', 'task'].includes(event.objectMetadata.nameSingular)) {
const linkedObjects = await this.handleLinkedObjects(event);
Expand All @@ -69,7 +69,9 @@ export class TimelineActivityService {
return [event];
}

private async handleLinkedObjects(event: ObjectRecordBaseEvent) {
private async handleLinkedObjects(
event: ObjectRecordBaseEventWithNameAndWorkspaceId,
) {
const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(
event.workspaceId,
);
Expand All @@ -92,7 +94,7 @@ export class TimelineActivityService {
}

private async processActivity(
event: ObjectRecordBaseEvent,
event: ObjectRecordBaseEventWithNameAndWorkspaceId,
dataSourceSchema: string,
activityType: string,
) {
Expand Down Expand Up @@ -145,7 +147,7 @@ export class TimelineActivityService {
}

private async processActivityTarget(
event: ObjectRecordBaseEvent,
event: ObjectRecordBaseEventWithNameAndWorkspaceId,
dataSourceSchema: string,
activityType: string,
) {
Expand Down

0 comments on commit ba5bbed

Please sign in to comment.