diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts index 7ce15f328770..6c37e85e4e27 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts @@ -10,9 +10,9 @@ import { SaveOrUpdateConnectedAccountInput } from 'src/engine/core-modules/auth/ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { - GoogleCalendarFullSyncJob, - GoogleCalendarFullSyncJobData, -} from 'src/modules/calendar/jobs/google-calendar-full-sync.job'; + GoogleCalendarSyncJob, + GoogleCalendarSyncJobData, +} from 'src/modules/calendar/jobs/google-calendar-sync.job'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; import { FeatureFlagEntity, @@ -135,10 +135,7 @@ export class GoogleAPIsService { this.environmentService.get('CALENDAR_PROVIDER_GOOGLE_ENABLED') && IsCalendarEnabled ) { - await this.enqueueGoogleCalendarFullSyncJob( - workspaceId, - connectedAccountId, - ); + await this.enqueueGoogleCalendarSyncJob(workspaceId, connectedAccountId); } return; @@ -170,10 +167,7 @@ export class GoogleAPIsService { } if (this.environmentService.get('CALENDAR_PROVIDER_GOOGLE_ENABLED')) { - await this.enqueueGoogleCalendarFullSyncJob( - workspaceId, - connectedAccountId, - ); + await this.enqueueGoogleCalendarSyncJob(workspaceId, connectedAccountId); } return; @@ -192,12 +186,12 @@ export class GoogleAPIsService { ); } - async enqueueGoogleCalendarFullSyncJob( + async enqueueGoogleCalendarSyncJob( workspaceId: string, connectedAccountId: string, ) { - await this.calendarQueueService.add( - GoogleCalendarFullSyncJob.name, + await this.calendarQueueService.add( + GoogleCalendarSyncJob.name, { workspaceId, connectedAccountId, diff --git a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts index 3130dc630c17..4ab0ef59cadc 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts @@ -28,12 +28,11 @@ import { StripeModule } from 'src/engine/core-modules/billing/stripe/stripe.modu import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; -import { GoogleCalendarFullSyncJob } from 'src/modules/calendar/jobs/google-calendar-full-sync.job'; +import { GoogleCalendarSyncJob } from 'src/modules/calendar/jobs/google-calendar-sync.job'; import { CalendarEventCleanerModule } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.module'; import { RecordPositionBackfillJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job'; import { RecordPositionBackfillModule } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-module'; -import { DeleteConnectedAccountAssociatedCalendarDataJob } from 'src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job'; -import { GoogleCalendarFullSyncModule } from 'src/modules/calendar/services/google-calendar-full-sync.module'; +import { GoogleCalendarSyncModule } from 'src/modules/calendar/services/google-calendar-sync.module'; import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module'; import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; @@ -45,12 +44,14 @@ import { EventObjectMetadata } from 'src/modules/event/standard-objects/event.ob import { HandleWorkspaceMemberDeletedJob } from 'src/engine/core-modules/workspace/handle-workspace-member-deleted.job'; import { GmailFullSynV2Module } from 'src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.module'; import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module'; -import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job'; import { GmailFullSyncV2Job } from 'src/modules/messaging/jobs/gmail-full-sync-v2.job'; import { GmailPartialSyncV2Job } from 'src/modules/messaging/jobs/gmail-partial-sync-v2.job'; import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.module'; +import { GoogleCalendarSyncCronJob } from 'src/modules/calendar/jobs/crons/google-calendar-sync.cron.job'; import { CalendarEventParticipantModule } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.module'; import { UnmatchParticipantJob } from 'src/modules/connected-account/jobs/unmatch-participant.job'; +import { DeleteConnectedAccountAssociatedCalendarDataJob } from 'src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job'; +import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job'; @Module({ imports: [ @@ -60,7 +61,7 @@ import { UnmatchParticipantJob } from 'src/modules/connected-account/jobs/unmatc DataSeedDemoWorkspaceModule, EnvironmentModule, HttpModule, - GoogleCalendarFullSyncModule, + GoogleCalendarSyncModule, ObjectMetadataModule, StripeModule, ThreadCleanerModule, @@ -86,8 +87,8 @@ import { UnmatchParticipantJob } from 'src/modules/connected-account/jobs/unmatc ], providers: [ { - provide: GoogleCalendarFullSyncJob.name, - useClass: GoogleCalendarFullSyncJob, + provide: GoogleCalendarSyncJob.name, + useClass: GoogleCalendarSyncJob, }, { provide: CallWebhookJobsJob.name, @@ -159,6 +160,10 @@ import { UnmatchParticipantJob } from 'src/modules/connected-account/jobs/unmatc provide: GmailPartialSyncV2Job.name, useClass: GmailPartialSyncV2Job, }, + { + provide: GoogleCalendarSyncCronJob.name, + useClass: GoogleCalendarSyncCronJob, + }, ], }) export class JobsModule { diff --git a/packages/twenty-server/src/modules/calendar/commands/google-calendar-full-sync.command.ts b/packages/twenty-server/src/modules/calendar/commands/google-calendar-sync.command.ts similarity index 81% rename from packages/twenty-server/src/modules/calendar/commands/google-calendar-full-sync.command.ts rename to packages/twenty-server/src/modules/calendar/commands/google-calendar-sync.command.ts index d3eb8df0a5a6..fcd11fe349f9 100644 --- a/packages/twenty-server/src/modules/calendar/commands/google-calendar-full-sync.command.ts +++ b/packages/twenty-server/src/modules/calendar/commands/google-calendar-sync.command.ts @@ -6,24 +6,24 @@ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queu import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { - GoogleCalendarFullSyncJobData, - GoogleCalendarFullSyncJob, -} from 'src/modules/calendar/jobs/google-calendar-full-sync.job'; + GoogleCalendarSyncJobData, + GoogleCalendarSyncJob, +} from 'src/modules/calendar/jobs/google-calendar-sync.job'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; import { CalendarChannelRepository } from 'src/modules/calendar/repositories/calendar-channel.repository'; import { CalendarChannelObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-channel.object-metadata'; -interface GoogleCalendarFullSyncOptions { +interface GoogleCalendarSyncOptions { workspaceId: string; } @Command({ - name: 'workspace:google-calendar-full-sync', + name: 'workspace:google-calendar-sync', description: - 'Start google calendar full-sync for all workspaceMembers in a workspace.', + 'Start google calendar sync for all workspaceMembers in a workspace.', }) -export class GoogleCalendarFullSyncCommand extends CommandRunner { +export class GoogleCalendarSyncCommand extends CommandRunner { constructor( @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, @@ -37,7 +37,7 @@ export class GoogleCalendarFullSyncCommand extends CommandRunner { async run( _passedParam: string[], - options: GoogleCalendarFullSyncOptions, + options: GoogleCalendarSyncOptions, ): Promise { await this.fetchWorkspaceCalendars(options.workspaceId); @@ -68,8 +68,8 @@ export class GoogleCalendarFullSyncCommand extends CommandRunner { continue; } - await this.messageQueueService.add( - GoogleCalendarFullSyncJob.name, + await this.messageQueueService.add( + GoogleCalendarSyncJob.name, { workspaceId, connectedAccountId: connectedAccount.id, diff --git a/packages/twenty-server/src/modules/calendar/commands/start-google-calendar-sync.cron.command.ts b/packages/twenty-server/src/modules/calendar/commands/start-google-calendar-sync.cron.command.ts new file mode 100644 index 000000000000..a0f867659a0a --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/commands/start-google-calendar-sync.cron.command.ts @@ -0,0 +1,31 @@ +import { Inject } from '@nestjs/common'; + +import { Command, CommandRunner } from 'nest-commander'; + +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { GoogleCalendarSyncCronJob } from 'src/modules/calendar/jobs/crons/google-calendar-sync.cron.job'; +import { googleCalendarSyncCronPattern } from 'src/modules/calendar/jobs/crons/pattern/google-calendar-sync.cron.pattern'; + +@Command({ + name: 'cron:calendar:google-calendar-sync', + description: 'Starts a cron job to sync google calendar for all workspaces.', +}) +export class StartGoogleCalendarSyncCronJobCommand extends CommandRunner { + constructor( + @Inject(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron( + GoogleCalendarSyncCronJob.name, + undefined, + { + repeat: { pattern: googleCalendarSyncCronPattern }, + }, + ); + } +} diff --git a/packages/twenty-server/src/modules/calendar/commands/workspace-calendar-sync-commands.module.ts b/packages/twenty-server/src/modules/calendar/commands/workspace-calendar-sync-commands.module.ts index 18e1b88691ca..c3fa4544f04b 100644 --- a/packages/twenty-server/src/modules/calendar/commands/workspace-calendar-sync-commands.module.ts +++ b/packages/twenty-server/src/modules/calendar/commands/workspace-calendar-sync-commands.module.ts @@ -1,13 +1,14 @@ import { Module } from '@nestjs/common'; -import { GoogleCalendarFullSyncCommand } from 'src/modules/calendar/commands/google-calendar-full-sync.command'; +import { GoogleCalendarSyncCommand } from 'src/modules/calendar/commands/google-calendar-sync.command'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; +import { StartGoogleCalendarSyncCronJobCommand } from 'src/modules/calendar/commands/start-google-calendar-sync.cron.command'; @Module({ imports: [ ObjectMetadataRepositoryModule.forFeature([ConnectedAccountObjectMetadata]), ], - providers: [GoogleCalendarFullSyncCommand], + providers: [GoogleCalendarSyncCommand, StartGoogleCalendarSyncCronJobCommand], }) export class WorkspaceCalendarSyncCommandsModule {} diff --git a/packages/twenty-server/src/modules/calendar/jobs/crons/google-calendar-sync.cron.job.ts b/packages/twenty-server/src/modules/calendar/jobs/crons/google-calendar-sync.cron.job.ts new file mode 100644 index 000000000000..3db394f82804 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/jobs/crons/google-calendar-sync.cron.job.ts @@ -0,0 +1,82 @@ +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository, In } from 'typeorm'; + +import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { + FeatureFlagEntity, + FeatureFlagKeys, +} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { CalendarChannelRepository } from 'src/modules/calendar/repositories/calendar-channel.repository'; +import { CalendarChannelObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-channel.object-metadata'; +import { GoogleCalendarSyncService } from 'src/modules/calendar/services/google-calendar-sync.service'; + +@Injectable() +export class GoogleCalendarSyncCronJob implements MessageQueueJob { + constructor( + @InjectRepository(Workspace, 'core') + private readonly workspaceRepository: Repository, + @InjectRepository(DataSourceEntity, 'metadata') + private readonly dataSourceRepository: Repository, + @InjectObjectMetadataRepository(CalendarChannelObjectMetadata) + private readonly calendarChannelRepository: CalendarChannelRepository, + @InjectRepository(FeatureFlagEntity, 'core') + private readonly featureFlagRepository: Repository, + private readonly googleCalendarSyncService: GoogleCalendarSyncService, + ) {} + + async handle(): Promise { + const workspaceIds = ( + await this.workspaceRepository.find({ + where: { + subscriptionStatus: 'active', + }, + select: ['id'], + }) + ).map((workspace) => workspace.id); + + const workspacesWithFeatureFlagActive = + await this.featureFlagRepository.find({ + where: { + workspaceId: In(workspaceIds), + key: FeatureFlagKeys.IsCalendarEnabled, + value: true, + }, + }); + + const dataSources = await this.dataSourceRepository.find({ + where: { + workspaceId: In( + workspacesWithFeatureFlagActive.map((w) => w.workspaceId), + ), + }, + }); + + const workspaceIdsWithDataSources = new Set( + dataSources.map((dataSource) => dataSource.workspaceId), + ); + + for (const workspaceId of workspaceIdsWithDataSources) { + await this.startWorkspaceGoogleCalendarSync(workspaceId); + } + } + + private async startWorkspaceGoogleCalendarSync( + workspaceId: string, + ): Promise { + const calendarChannels = + await this.calendarChannelRepository.getAll(workspaceId); + + for (const calendarChannel of calendarChannels) { + await this.googleCalendarSyncService.startGoogleCalendarSync( + workspaceId, + calendarChannel.connectedAccountId, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/calendar/jobs/crons/pattern/google-calendar-sync.cron.pattern.ts b/packages/twenty-server/src/modules/calendar/jobs/crons/pattern/google-calendar-sync.cron.pattern.ts new file mode 100644 index 000000000000..2a0d3f7449df --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/jobs/crons/pattern/google-calendar-sync.cron.pattern.ts @@ -0,0 +1 @@ +export const googleCalendarSyncCronPattern = '*/5 * * * *'; diff --git a/packages/twenty-server/src/modules/calendar/jobs/google-calendar-full-sync.job.ts b/packages/twenty-server/src/modules/calendar/jobs/google-calendar-sync.job.ts similarity index 54% rename from packages/twenty-server/src/modules/calendar/jobs/google-calendar-full-sync.job.ts rename to packages/twenty-server/src/modules/calendar/jobs/google-calendar-sync.job.ts index 6408e72b6dba..08019f4201b5 100644 --- a/packages/twenty-server/src/modules/calendar/jobs/google-calendar-full-sync.job.ts +++ b/packages/twenty-server/src/modules/calendar/jobs/google-calendar-sync.job.ts @@ -3,32 +3,27 @@ import { Injectable, Logger } from '@nestjs/common'; import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service'; -import { GoogleCalendarFullSyncService } from 'src/modules/calendar/services/google-calendar-full-sync.service'; +import { GoogleCalendarSyncService } from 'src/modules/calendar/services/google-calendar-sync.service'; -export type GoogleCalendarFullSyncJobData = { +export type GoogleCalendarSyncJobData = { workspaceId: string; connectedAccountId: string; - nextPageToken?: string; }; @Injectable() -export class GoogleCalendarFullSyncJob - implements MessageQueueJob +export class GoogleCalendarSyncJob + implements MessageQueueJob { - private readonly logger = new Logger(GoogleCalendarFullSyncJob.name); + private readonly logger = new Logger(GoogleCalendarSyncJob.name); constructor( private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, - private readonly googleCalendarFullSyncService: GoogleCalendarFullSyncService, + private readonly googleCalendarSyncService: GoogleCalendarSyncService, ) {} - async handle(data: GoogleCalendarFullSyncJobData): Promise { + async handle(data: GoogleCalendarSyncJobData): Promise { this.logger.log( - `google calendar full-sync for workspace ${ - data.workspaceId - } and account ${data.connectedAccountId} ${ - data.nextPageToken ? `and ${data.nextPageToken} pageToken` : '' - }`, + `google calendar sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`, ); try { await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken( @@ -44,10 +39,9 @@ export class GoogleCalendarFullSyncJob return; } - await this.googleCalendarFullSyncService.startGoogleCalendarFullSync( + await this.googleCalendarSyncService.startGoogleCalendarSync( data.workspaceId, data.connectedAccountId, - data.nextPageToken, ); } } diff --git a/packages/twenty-server/src/modules/calendar/repositories/calendar-channel.repository.ts b/packages/twenty-server/src/modules/calendar/repositories/calendar-channel.repository.ts index 95e8299f23f9..faf1be210988 100644 --- a/packages/twenty-server/src/modules/calendar/repositories/calendar-channel.repository.ts +++ b/packages/twenty-server/src/modules/calendar/repositories/calendar-channel.repository.ts @@ -12,6 +12,21 @@ export class CalendarChannelRepository { private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} + public async getAll( + workspaceId: string, + transactionManager?: EntityManager, + ): Promise[]> { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."calendarChannel"`, + [], + workspaceId, + transactionManager, + ); + } + public async getByConnectedAccountId( connectedAccountId: string, workspaceId: string, diff --git a/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts b/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts index 5a38da066021..55f1430e3d86 100644 --- a/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts +++ b/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts @@ -160,14 +160,14 @@ export class CalendarEventParticipantRepository { ); } - public async updateCalendarEventParticipants( + public async updateCalendarEventParticipantsAndReturnNewOnes( calendarEventParticipants: CalendarEventParticipant[], iCalUIDCalendarEventIdMap: Map, workspaceId: string, transactionManager?: EntityManager, - ): Promise { + ): Promise { if (calendarEventParticipants.length === 0) { - return; + return []; } const dataSourceSchema = @@ -189,6 +189,14 @@ export class CalendarEventParticipantRepository { calendarEventParticipant.handle, ); + const newCalendarEventParticipants = differenceWith( + calendarEventParticipants, + existingCalendarEventParticipants, + (calendarEventParticipant, existingCalendarEventParticipant) => + calendarEventParticipant.handle === + existingCalendarEventParticipant.handle, + ); + await this.deleteByIds( calendarEventParticipantsToDelete.map( (calendarEventParticipant) => calendarEventParticipant.id, @@ -227,6 +235,8 @@ export class CalendarEventParticipantRepository { workspaceId, transactionManager, ); + + return newCalendarEventParticipants; } public async getWithoutPersonIdAndWorkspaceMemberId( diff --git a/packages/twenty-server/src/modules/calendar/repositories/calendar-event.repository.ts b/packages/twenty-server/src/modules/calendar/repositories/calendar-event.repository.ts index 6f70ff19ee63..8da12f27d555 100644 --- a/packages/twenty-server/src/modules/calendar/repositories/calendar-event.repository.ts +++ b/packages/twenty-server/src/modules/calendar/repositories/calendar-event.repository.ts @@ -80,34 +80,36 @@ export class CalendarEventRepository { } public async getICalUIDCalendarEventIdMap( - iCalUIDs: string[], + calendarEventIds: string[], workspaceId: string, transactionManager?: EntityManager, ): Promise> { - if (iCalUIDs.length === 0) { + if (calendarEventIds.length === 0) { return new Map(); } const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(workspaceId); - const calendarEvents: { - id: string; - iCalUID: string; - }[] = await this.workspaceDataSourceService.executeRawQuery( - `SELECT id, "iCalUID" FROM ${dataSourceSchema}."calendarEvent" WHERE "iCalUID" = ANY($1)`, - [iCalUIDs], + const calendarEvents: + | { + id: string; + iCalUID: string; + }[] + | undefined = await this.workspaceDataSourceService.executeRawQuery( + `SELECT id, "iCalUID" FROM ${dataSourceSchema}."calendarEvent" WHERE "id" = ANY($1)`, + [calendarEventIds], workspaceId, transactionManager, ); - const iCalUIDsCalendarEvnetIdsMap = new Map(); + const iCalUIDsCalendarEventIdsMap = new Map(); - calendarEvents.forEach((calendarEvent) => { - iCalUIDsCalendarEvnetIdsMap.set(calendarEvent.iCalUID, calendarEvent.id); + calendarEvents?.forEach((calendarEvent) => { + iCalUIDsCalendarEventIdsMap.set(calendarEvent.iCalUID, calendarEvent.id); }); - return iCalUIDsCalendarEvnetIdsMap; + return iCalUIDsCalendarEventIdsMap; } public async saveCalendarEvents( diff --git a/packages/twenty-server/src/modules/calendar/services/google-calendar-full-sync.module.ts b/packages/twenty-server/src/modules/calendar/services/google-calendar-sync.module.ts similarity index 85% rename from packages/twenty-server/src/modules/calendar/services/google-calendar-full-sync.module.ts rename to packages/twenty-server/src/modules/calendar/services/google-calendar-sync.module.ts index 32c5f0ea9dd4..476950ec4f64 100644 --- a/packages/twenty-server/src/modules/calendar/services/google-calendar-full-sync.module.ts +++ b/packages/twenty-server/src/modules/calendar/services/google-calendar-sync.module.ts @@ -4,8 +4,9 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; +import { CalendarEventCleanerModule } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.module'; import { CalendarEventParticipantModule } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.module'; -import { GoogleCalendarFullSyncService } from 'src/modules/calendar/services/google-calendar-full-sync.service'; +import { GoogleCalendarSyncService } from 'src/modules/calendar/services/google-calendar-sync.service'; import { CalendarProvidersModule } from 'src/modules/calendar/services/providers/calendar-providers.module'; import { CalendarChannelEventAssociationObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-channel-event-association.object-metadata'; import { CalendarChannelObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-channel.object-metadata'; @@ -32,8 +33,9 @@ import { WorkspaceMemberObjectMetadata } from 'src/modules/workspace-member/stan CalendarEventParticipantModule, TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), WorkspaceDataSourceModule, + CalendarEventCleanerModule, ], - providers: [GoogleCalendarFullSyncService], - exports: [GoogleCalendarFullSyncService], + providers: [GoogleCalendarSyncService], + exports: [GoogleCalendarSyncService], }) -export class GoogleCalendarFullSyncModule {} +export class GoogleCalendarSyncModule {} diff --git a/packages/twenty-server/src/modules/calendar/services/google-calendar-full-sync.service.ts b/packages/twenty-server/src/modules/calendar/services/google-calendar-sync.service.ts similarity index 66% rename from packages/twenty-server/src/modules/calendar/services/google-calendar-full-sync.service.ts rename to packages/twenty-server/src/modules/calendar/services/google-calendar-sync.service.ts index d845a39daba5..c1c1f01a01ef 100644 --- a/packages/twenty-server/src/modules/calendar/services/google-calendar-full-sync.service.ts +++ b/packages/twenty-server/src/modules/calendar/services/google-calendar-sync.service.ts @@ -1,8 +1,9 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { Repository } from 'typeorm'; +import { calendar_v3 as calendarV3 } from 'googleapis'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository'; @@ -14,12 +15,9 @@ import { GoogleCalendarClientProvider } from 'src/modules/calendar/services/prov import { googleCalendarSearchFilterExcludeEmails } from 'src/modules/calendar/utils/google-calendar-search-filter.util'; import { CalendarChannelEventAssociationRepository } from 'src/modules/calendar/repositories/calendar-channel-event-association.repository'; import { CalendarChannelRepository } from 'src/modules/calendar/repositories/calendar-channel.repository'; -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { CalendarEventRepository } from 'src/modules/calendar/repositories/calendar-event.repository'; import { formatGoogleCalendarEvent } from 'src/modules/calendar/utils/format-google-calendar-event.util'; -import { GoogleCalendarFullSyncJobData } from 'src/modules/calendar/jobs/google-calendar-full-sync.job'; import { CalendarEventParticipantRepository } from 'src/modules/calendar/repositories/calendar-event-participant.repository'; import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; @@ -28,16 +26,16 @@ import { CalendarChannelObjectMetadata } from 'src/modules/calendar/standard-obj import { CalendarChannelEventAssociationObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-channel-event-association.object-metadata'; import { CalendarEventParticipantObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-event-participant.object-metadata'; import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; +import { CalendarEventCleanerService } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.service'; import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service'; +import { CalendarEventParticipant } from 'src/modules/calendar/types/calendar-event'; @Injectable() -export class GoogleCalendarFullSyncService { - private readonly logger = new Logger(GoogleCalendarFullSyncService.name); +export class GoogleCalendarSyncService { + private readonly logger = new Logger(GoogleCalendarSyncService.name); constructor( private readonly googleCalendarClientProvider: GoogleCalendarClientProvider, - @Inject(MessageQueue.calendarQueue) - private readonly messageQueueService: MessageQueueService, @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) private readonly connectedAccountRepository: ConnectedAccountRepository, @InjectObjectMetadataRepository(CalendarEventObjectMetadata) @@ -56,13 +54,13 @@ export class GoogleCalendarFullSyncService { private readonly featureFlagRepository: Repository, private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly eventEmitter: EventEmitter2, + private readonly calendarEventCleanerService: CalendarEventCleanerService, private readonly calendarEventParticipantsService: CalendarEventParticipantService, ) {} - public async startGoogleCalendarFullSync( + public async startGoogleCalendarSync( workspaceId: string, connectedAccountId: string, - pageToken?: string, ): Promise { const connectedAccount = await this.connectedAccountRepository.getById( connectedAccountId, @@ -78,7 +76,7 @@ export class GoogleCalendarFullSyncService { if (!refreshToken) { throw new Error( - `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId} during full-sync`, + `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId} during sync`, ); } @@ -88,6 +86,8 @@ export class GoogleCalendarFullSyncService { workspaceId, ); + const syncToken = calendarChannel?.syncCursor || undefined; + if (!calendarChannel) { return; } @@ -120,30 +120,47 @@ export class GoogleCalendarFullSyncService { let startTime = Date.now(); - const googleCalendarEvents = await googleCalendarClient.events.list({ - calendarId: 'primary', - maxResults: 500, - pageToken: pageToken, - q: googleCalendarSearchFilterExcludeEmails(blocklistedEmails), - }); + let nextSyncToken: string | null | undefined; + let nextPageToken: string | undefined; + const events: calendarV3.Schema$Event[] = []; + + while (true) { + const googleCalendarEvents = await googleCalendarClient.events.list({ + calendarId: 'primary', + maxResults: 500, + syncToken, + pageToken: nextPageToken, + showDeleted: true, + q: googleCalendarSearchFilterExcludeEmails(blocklistedEmails), + }); + + nextSyncToken = googleCalendarEvents.data.nextSyncToken; + nextPageToken = googleCalendarEvents.data.nextPageToken || undefined; + + const { items } = googleCalendarEvents.data; + + if (!items || items.length === 0) { + break; + } + + events.push(...items); + + if (!nextPageToken) { + break; + } + } let endTime = Date.now(); this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId} getting events list in ${ + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} getting events list in ${ endTime - startTime }ms.`, ); - const { - items: events, - nextPageToken, - nextSyncToken, - } = googleCalendarEvents.data; - if (!events || events?.length === 0) { this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`, + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`, ); return; @@ -163,17 +180,11 @@ export class GoogleCalendarFullSyncService { endTime = Date.now(); this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: getting existing calendar channel event associations in ${ + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: getting existing calendar channel event associations in ${ endTime - startTime }ms.`, ); - // TODO: In V2, we will also import deleted events by doing batch GET queries on the canceled events - // The canceled events start and end are not accessible in the list query - const formattedEvents = events - .filter((event) => event.status !== 'cancelled') - .map((event) => formatGoogleCalendarEvent(event)); - // TODO: When we will be able to add unicity contraint on iCalUID, we will do a INSERT ON CONFLICT DO UPDATE const existingEventExternalIds = @@ -181,6 +192,22 @@ export class GoogleCalendarFullSyncService { (association) => association.eventExternalId, ); + const existingEventsIds = existingCalendarChannelEventAssociations.map( + (association) => association.calendarEventId, + ); + + const iCalUIDCalendarEventIdMap = + await this.calendarEventRepository.getICalUIDCalendarEventIdMap( + existingEventsIds, + workspaceId, + ); + + const formattedEvents = events + .filter((event) => event.status !== 'cancelled') + .map((event) => + formatGoogleCalendarEvent(event, iCalUIDCalendarEventIdMap), + ); + const eventsToSave = formattedEvents.filter( (event) => !existingEventExternalIds.includes(event.externalId), ); @@ -189,6 +216,10 @@ export class GoogleCalendarFullSyncService { existingEventExternalIds.includes(event.externalId), ); + const cancelledEventExternalIds = events + .filter((event) => event.status === 'cancelled') + .map((event) => event.id as string); + const calendarChannelEventAssociationsToSave = eventsToSave.map( (event) => ({ calendarEventId: event.id, @@ -205,11 +236,7 @@ export class GoogleCalendarFullSyncService { (event) => event.participants, ); - const iCalUIDCalendarEventIdMap = - await this.calendarEventRepository.getICalUIDCalendarEventIdMap( - eventsToUpdate.map((event) => event.iCalUID), - workspaceId, - ); + let newCalendarEventParticipants: CalendarEventParticipant[] = []; if (events.length > 0) { const dataSourceMetadata = @@ -230,9 +257,9 @@ export class GoogleCalendarFullSyncService { endTime = Date.now(); this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: saving events in ${ - endTime - startTime - }ms.`, + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: saving ${ + eventsToSave.length + } events in ${endTime - startTime}ms.`, ); startTime = Date.now(); @@ -246,9 +273,9 @@ export class GoogleCalendarFullSyncService { endTime = Date.now(); this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating events in ${ - endTime - startTime - }ms.`, + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: updating ${ + eventsToUpdate.length + } events in ${endTime - startTime}ms.`, ); startTime = Date.now(); @@ -262,7 +289,27 @@ export class GoogleCalendarFullSyncService { endTime = Date.now(); this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: saving calendar channel event associations in ${ + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: saving calendar channel event associations in ${ + endTime - startTime + }ms.`, + ); + + startTime = Date.now(); + + newCalendarEventParticipants = + await this.calendarEventParticipantsRepository.updateCalendarEventParticipantsAndReturnNewOnes( + participantsToUpdate, + iCalUIDCalendarEventIdMap, + workspaceId, + transactionManager, + ); + + endTime = Date.now(); + + participantsToSave.push(...newCalendarEventParticipants); + + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: updating participants in ${ endTime - startTime }ms.`, ); @@ -278,16 +325,16 @@ export class GoogleCalendarFullSyncService { endTime = Date.now(); this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: saving participants in ${ + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: saving participants in ${ endTime - startTime }ms.`, ); startTime = Date.now(); - await this.calendarEventParticipantsRepository.updateCalendarEventParticipants( - participantsToUpdate, - iCalUIDCalendarEventIdMap, + await this.calendarChannelEventAssociationRepository.deleteByEventExternalIdsAndCalendarChannelId( + cancelledEventExternalIds, + calendarChannelId, workspaceId, transactionManager, ); @@ -295,35 +342,47 @@ export class GoogleCalendarFullSyncService { endTime = Date.now(); this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating participants in ${ + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: deleting calendar channel event associations in ${ endTime - startTime }ms.`, ); }); - if (calendarChannel.isContactAutoCreationEnabled) { - const contactsToCreate = participantsToSave; + startTime = Date.now(); + + await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents( + workspaceId, + ); + + endTime = Date.now(); + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: cleaning calendar events in ${ + endTime - startTime + }ms.`, + ); + + if (calendarChannel.isContactAutoCreationEnabled) { this.eventEmitter.emit(`createContacts`, { workspaceId, connectedAccountHandle: connectedAccount.handle, - contactsToCreate, + contactsToCreate: participantsToSave, }); } } catch (error) { this.logger.error( - `Error during google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}`, + `Error during google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}`, ); } } else { this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`, + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`, ); } if (!nextSyncToken) { throw new Error( - `No next sync token found for connected account ${connectedAccountId} in workspace ${workspaceId} during full-sync`, + `No next sync token found for connected account ${connectedAccountId} in workspace ${workspaceId} during sync`, ); } @@ -338,29 +397,15 @@ export class GoogleCalendarFullSyncService { endTime = Date.now(); this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating sync cursor in ${ + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: updating sync cursor in ${ endTime - startTime }ms.`, ); this.logger.log( - `google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId} ${ - nextPageToken ? `and ${nextPageToken} pageToken` : '' - } done.`, + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} ${ + syncToken ? `and ${syncToken} syncToken ` : '' + }done.`, ); - - if (nextPageToken) { - await this.messageQueueService.add( - GoogleCalendarFullSyncService.name, - { - workspaceId, - connectedAccountId, - nextPageToken, - }, - { - retryLimit: 2, - }, - ); - } } } diff --git a/packages/twenty-server/src/modules/calendar/utils/format-google-calendar-event.util.ts b/packages/twenty-server/src/modules/calendar/utils/format-google-calendar-event.util.ts index 0d96e7819f76..6636569261a2 100644 --- a/packages/twenty-server/src/modules/calendar/utils/format-google-calendar-event.util.ts +++ b/packages/twenty-server/src/modules/calendar/utils/format-google-calendar-event.util.ts @@ -1,13 +1,15 @@ -import { calendar_v3 } from 'googleapis'; +import { calendar_v3 as calendarV3 } from 'googleapis'; import { v4 } from 'uuid'; import { CalendarEventParticipantResponseStatus } from 'src/modules/calendar/standard-objects/calendar-event-participant.object-metadata'; import { CalendarEventWithParticipants } from 'src/modules/calendar/types/calendar-event'; export const formatGoogleCalendarEvent = ( - event: calendar_v3.Schema$Event, + event: calendarV3.Schema$Event, + iCalUIDCalendarEventIdMap: Map, ): CalendarEventWithParticipants => { - const id = v4(); + const id = + (event.iCalUID && iCalUIDCalendarEventIdMap.get(event.iCalUID)) ?? v4(); const formatResponseStatus = (status: string | null | undefined) => { switch (status) { diff --git a/packages/twenty-server/src/modules/calendar/utils/google-calendar-search-filter.util.ts b/packages/twenty-server/src/modules/calendar/utils/google-calendar-search-filter.util.ts index c9c90417a228..2563c0d05841 100644 --- a/packages/twenty-server/src/modules/calendar/utils/google-calendar-search-filter.util.ts +++ b/packages/twenty-server/src/modules/calendar/utils/google-calendar-search-filter.util.ts @@ -1,8 +1,8 @@ export const googleCalendarSearchFilterExcludeEmails = ( emails: string[], -): string => { +): string | undefined => { if (emails.length === 0) { - return ''; + return undefined; } return `email=-${emails.join(', -')}`; diff --git a/packages/twenty-server/src/modules/messaging/commands/messaging-command.module.ts b/packages/twenty-server/src/modules/messaging/commands/messaging-command.module.ts index 0bfa32f1fc17..48e3c6636402 100644 --- a/packages/twenty-server/src/modules/messaging/commands/messaging-command.module.ts +++ b/packages/twenty-server/src/modules/messaging/commands/messaging-command.module.ts @@ -2,9 +2,8 @@ import { Module } from '@nestjs/common'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { GmailFetchMessagesFromCacheCronCommand } from 'src/modules/messaging/commands/crons/gmail-fetch-messages-from-cache.cron.command'; -import { GmailPartialSyncCronCommand } from 'src/modules/messaging/commands/crons/gmail-partial-sync.cron.command'; - +import { GmailFetchMessagesFromCacheCronCommand } from 'src/modules/messaging/jobs/crons/gmail-fetch-messages-from-cache.cron.command'; +import { GmailPartialSyncCronCommand } from 'src/modules/messaging/jobs/crons/gmail-partial-sync.cron.command'; @Module({ imports: [ ObjectMetadataRepositoryModule.forFeature([ConnectedAccountObjectMetadata]), diff --git a/packages/twenty-server/src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job.ts b/packages/twenty-server/src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job.ts index 16ec17e18637..8ed5ea6107e4 100644 --- a/packages/twenty-server/src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In } from 'typeorm'; @@ -16,8 +16,6 @@ import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/ export class FetchAllMessagesFromCacheCronJob implements MessageQueueJob { - private readonly logger = new Logger(FetchAllMessagesFromCacheCronJob.name); - constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, diff --git a/packages/twenty-server/src/modules/messaging/commands/crons/gmail-fetch-messages-from-cache.cron.command.ts b/packages/twenty-server/src/modules/messaging/jobs/crons/gmail-fetch-messages-from-cache.cron.command.ts similarity index 100% rename from packages/twenty-server/src/modules/messaging/commands/crons/gmail-fetch-messages-from-cache.cron.command.ts rename to packages/twenty-server/src/modules/messaging/jobs/crons/gmail-fetch-messages-from-cache.cron.command.ts diff --git a/packages/twenty-server/src/modules/messaging/commands/crons/gmail-partial-sync.cron.command.ts b/packages/twenty-server/src/modules/messaging/jobs/crons/gmail-partial-sync.cron.command.ts similarity index 92% rename from packages/twenty-server/src/modules/messaging/commands/crons/gmail-partial-sync.cron.command.ts rename to packages/twenty-server/src/modules/messaging/jobs/crons/gmail-partial-sync.cron.command.ts index df11717a2fd3..2cfe7be30777 100644 --- a/packages/twenty-server/src/modules/messaging/commands/crons/gmail-partial-sync.cron.command.ts +++ b/packages/twenty-server/src/modules/messaging/jobs/crons/gmail-partial-sync.cron.command.ts @@ -4,8 +4,8 @@ import { Command, CommandRunner } from 'nest-commander'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { fetchAllWorkspacesMessagesCronPattern } from 'src/modules/messaging/commands/crons/patterns/fetch-all-workspaces-messages.cron.pattern'; import { GmailPartialSyncCronJob } from 'src/modules/messaging/jobs/crons/gmail-partial-sync.cron.job'; +import { fetchAllWorkspacesMessagesCronPattern } from 'src/modules/messaging/jobs/crons/patterns/fetch-all-workspaces-messages.cron.pattern'; @Command({ name: 'cron:messaging:gmail-partial-sync', diff --git a/packages/twenty-server/src/modules/messaging/commands/crons/patterns/fetch-all-workspaces-messages.cron.pattern.ts b/packages/twenty-server/src/modules/messaging/jobs/crons/patterns/fetch-all-workspaces-messages.cron.pattern.ts similarity index 100% rename from packages/twenty-server/src/modules/messaging/commands/crons/patterns/fetch-all-workspaces-messages.cron.pattern.ts rename to packages/twenty-server/src/modules/messaging/jobs/crons/patterns/fetch-all-workspaces-messages.cron.pattern.ts