Skip to content

Commit

Permalink
4710 implement google calendar incremental sync (twentyhq#4822)
Browse files Browse the repository at this point in the history
  • Loading branch information
bosiraphael authored Apr 10, 2024
1 parent a986995 commit 0a6dc75
Show file tree
Hide file tree
Showing 20 changed files with 333 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import { SaveOrUpdateConnectedAccountInput } from 'src/engine/core-modules/auth/
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
GoogleCalendarFullSyncJob,
GoogleCalendarFullSyncJobData,
} from 'src/modules/calendar/jobs/google-calendar-full-sync.job';
GoogleCalendarSyncJob,
GoogleCalendarSyncJobData,
} from 'src/modules/calendar/jobs/google-calendar-sync.job';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import {
FeatureFlagEntity,
Expand Down Expand Up @@ -135,10 +135,7 @@ export class GoogleAPIsService {
this.environmentService.get('CALENDAR_PROVIDER_GOOGLE_ENABLED') &&
IsCalendarEnabled
) {
await this.enqueueGoogleCalendarFullSyncJob(
workspaceId,
connectedAccountId,
);
await this.enqueueGoogleCalendarSyncJob(workspaceId, connectedAccountId);
}

return;
Expand Down Expand Up @@ -170,10 +167,7 @@ export class GoogleAPIsService {
}

if (this.environmentService.get('CALENDAR_PROVIDER_GOOGLE_ENABLED')) {
await this.enqueueGoogleCalendarFullSyncJob(
workspaceId,
connectedAccountId,
);
await this.enqueueGoogleCalendarSyncJob(workspaceId, connectedAccountId);
}

return;
Expand All @@ -192,12 +186,12 @@ export class GoogleAPIsService {
);
}

async enqueueGoogleCalendarFullSyncJob(
async enqueueGoogleCalendarSyncJob(
workspaceId: string,
connectedAccountId: string,
) {
await this.calendarQueueService.add<GoogleCalendarFullSyncJobData>(
GoogleCalendarFullSyncJob.name,
await this.calendarQueueService.add<GoogleCalendarSyncJobData>(
GoogleCalendarSyncJob.name,
{
workspaceId,
connectedAccountId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ import { StripeModule } from 'src/engine/core-modules/billing/stripe/stripe.modu
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { GoogleCalendarFullSyncJob } from 'src/modules/calendar/jobs/google-calendar-full-sync.job';
import { GoogleCalendarSyncJob } from 'src/modules/calendar/jobs/google-calendar-sync.job';
import { CalendarEventCleanerModule } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.module';
import { RecordPositionBackfillJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job';
import { RecordPositionBackfillModule } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-module';
import { DeleteConnectedAccountAssociatedCalendarDataJob } from 'src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job';
import { GoogleCalendarFullSyncModule } from 'src/modules/calendar/services/google-calendar-full-sync.module';
import { GoogleCalendarSyncModule } from 'src/modules/calendar/services/google-calendar-sync.module';
import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module';
import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
Expand All @@ -45,12 +44,14 @@ import { EventObjectMetadata } from 'src/modules/event/standard-objects/event.ob
import { HandleWorkspaceMemberDeletedJob } from 'src/engine/core-modules/workspace/handle-workspace-member-deleted.job';
import { GmailFullSynV2Module } from 'src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.module';
import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module';
import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job';
import { GmailFullSyncV2Job } from 'src/modules/messaging/jobs/gmail-full-sync-v2.job';
import { GmailPartialSyncV2Job } from 'src/modules/messaging/jobs/gmail-partial-sync-v2.job';
import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.module';
import { GoogleCalendarSyncCronJob } from 'src/modules/calendar/jobs/crons/google-calendar-sync.cron.job';
import { CalendarEventParticipantModule } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.module';
import { UnmatchParticipantJob } from 'src/modules/connected-account/jobs/unmatch-participant.job';
import { DeleteConnectedAccountAssociatedCalendarDataJob } from 'src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job';
import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job';

@Module({
imports: [
Expand All @@ -60,7 +61,7 @@ import { UnmatchParticipantJob } from 'src/modules/connected-account/jobs/unmatc
DataSeedDemoWorkspaceModule,
EnvironmentModule,
HttpModule,
GoogleCalendarFullSyncModule,
GoogleCalendarSyncModule,
ObjectMetadataModule,
StripeModule,
ThreadCleanerModule,
Expand All @@ -86,8 +87,8 @@ import { UnmatchParticipantJob } from 'src/modules/connected-account/jobs/unmatc
],
providers: [
{
provide: GoogleCalendarFullSyncJob.name,
useClass: GoogleCalendarFullSyncJob,
provide: GoogleCalendarSyncJob.name,
useClass: GoogleCalendarSyncJob,
},
{
provide: CallWebhookJobsJob.name,
Expand Down Expand Up @@ -159,6 +160,10 @@ import { UnmatchParticipantJob } from 'src/modules/connected-account/jobs/unmatc
provide: GmailPartialSyncV2Job.name,
useClass: GmailPartialSyncV2Job,
},
{
provide: GoogleCalendarSyncCronJob.name,
useClass: GoogleCalendarSyncCronJob,
},
],
})
export class JobsModule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queu
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import {
GoogleCalendarFullSyncJobData,
GoogleCalendarFullSyncJob,
} from 'src/modules/calendar/jobs/google-calendar-full-sync.job';
GoogleCalendarSyncJobData,
GoogleCalendarSyncJob,
} from 'src/modules/calendar/jobs/google-calendar-sync.job';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { CalendarChannelRepository } from 'src/modules/calendar/repositories/calendar-channel.repository';
import { CalendarChannelObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-channel.object-metadata';

interface GoogleCalendarFullSyncOptions {
interface GoogleCalendarSyncOptions {
workspaceId: string;
}

@Command({
name: 'workspace:google-calendar-full-sync',
name: 'workspace:google-calendar-sync',
description:
'Start google calendar full-sync for all workspaceMembers in a workspace.',
'Start google calendar sync for all workspaceMembers in a workspace.',
})
export class GoogleCalendarFullSyncCommand extends CommandRunner {
export class GoogleCalendarSyncCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
Expand All @@ -37,7 +37,7 @@ export class GoogleCalendarFullSyncCommand extends CommandRunner {

async run(
_passedParam: string[],
options: GoogleCalendarFullSyncOptions,
options: GoogleCalendarSyncOptions,
): Promise<void> {
await this.fetchWorkspaceCalendars(options.workspaceId);

Expand Down Expand Up @@ -68,8 +68,8 @@ export class GoogleCalendarFullSyncCommand extends CommandRunner {
continue;
}

await this.messageQueueService.add<GoogleCalendarFullSyncJobData>(
GoogleCalendarFullSyncJob.name,
await this.messageQueueService.add<GoogleCalendarSyncJobData>(
GoogleCalendarSyncJob.name,
{
workspaceId,
connectedAccountId: connectedAccount.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Inject } from '@nestjs/common';

import { Command, CommandRunner } from 'nest-commander';

import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GoogleCalendarSyncCronJob } from 'src/modules/calendar/jobs/crons/google-calendar-sync.cron.job';
import { googleCalendarSyncCronPattern } from 'src/modules/calendar/jobs/crons/pattern/google-calendar-sync.cron.pattern';

@Command({
name: 'cron:calendar:google-calendar-sync',
description: 'Starts a cron job to sync google calendar for all workspaces.',
})
export class StartGoogleCalendarSyncCronJobCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}

async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
GoogleCalendarSyncCronJob.name,
undefined,
{
repeat: { pattern: googleCalendarSyncCronPattern },
},
);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { Module } from '@nestjs/common';

import { GoogleCalendarFullSyncCommand } from 'src/modules/calendar/commands/google-calendar-full-sync.command';
import { GoogleCalendarSyncCommand } from 'src/modules/calendar/commands/google-calendar-sync.command';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { StartGoogleCalendarSyncCronJobCommand } from 'src/modules/calendar/commands/start-google-calendar-sync.cron.command';

@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([ConnectedAccountObjectMetadata]),
],
providers: [GoogleCalendarFullSyncCommand],
providers: [GoogleCalendarSyncCommand, StartGoogleCalendarSyncCronJobCommand],
})
export class WorkspaceCalendarSyncCommandsModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { Repository, In } from 'typeorm';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';

import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { CalendarChannelRepository } from 'src/modules/calendar/repositories/calendar-channel.repository';
import { CalendarChannelObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-channel.object-metadata';
import { GoogleCalendarSyncService } from 'src/modules/calendar/services/google-calendar-sync.service';

@Injectable()
export class GoogleCalendarSyncCronJob implements MessageQueueJob<undefined> {
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@InjectObjectMetadataRepository(CalendarChannelObjectMetadata)
private readonly calendarChannelRepository: CalendarChannelRepository,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
private readonly googleCalendarSyncService: GoogleCalendarSyncService,
) {}

async handle(): Promise<void> {
const workspaceIds = (
await this.workspaceRepository.find({
where: {
subscriptionStatus: 'active',
},
select: ['id'],
})
).map((workspace) => workspace.id);

const workspacesWithFeatureFlagActive =
await this.featureFlagRepository.find({
where: {
workspaceId: In(workspaceIds),
key: FeatureFlagKeys.IsCalendarEnabled,
value: true,
},
});

const dataSources = await this.dataSourceRepository.find({
where: {
workspaceId: In(
workspacesWithFeatureFlagActive.map((w) => w.workspaceId),
),
},
});

const workspaceIdsWithDataSources = new Set(
dataSources.map((dataSource) => dataSource.workspaceId),
);

for (const workspaceId of workspaceIdsWithDataSources) {
await this.startWorkspaceGoogleCalendarSync(workspaceId);
}
}

private async startWorkspaceGoogleCalendarSync(
workspaceId: string,
): Promise<void> {
const calendarChannels =
await this.calendarChannelRepository.getAll(workspaceId);

for (const calendarChannel of calendarChannels) {
await this.googleCalendarSyncService.startGoogleCalendarSync(
workspaceId,
calendarChannel.connectedAccountId,
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const googleCalendarSyncCronPattern = '*/5 * * * *';
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,27 @@ import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';

import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GoogleCalendarFullSyncService } from 'src/modules/calendar/services/google-calendar-full-sync.service';
import { GoogleCalendarSyncService } from 'src/modules/calendar/services/google-calendar-sync.service';

export type GoogleCalendarFullSyncJobData = {
export type GoogleCalendarSyncJobData = {
workspaceId: string;
connectedAccountId: string;
nextPageToken?: string;
};

@Injectable()
export class GoogleCalendarFullSyncJob
implements MessageQueueJob<GoogleCalendarFullSyncJobData>
export class GoogleCalendarSyncJob
implements MessageQueueJob<GoogleCalendarSyncJobData>
{
private readonly logger = new Logger(GoogleCalendarFullSyncJob.name);
private readonly logger = new Logger(GoogleCalendarSyncJob.name);

constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly googleCalendarFullSyncService: GoogleCalendarFullSyncService,
private readonly googleCalendarSyncService: GoogleCalendarSyncService,
) {}

async handle(data: GoogleCalendarFullSyncJobData): Promise<void> {
async handle(data: GoogleCalendarSyncJobData): Promise<void> {
this.logger.log(
`google calendar full-sync for workspace ${
data.workspaceId
} and account ${data.connectedAccountId} ${
data.nextPageToken ? `and ${data.nextPageToken} pageToken` : ''
}`,
`google calendar sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`,
);
try {
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
Expand All @@ -44,10 +39,9 @@ export class GoogleCalendarFullSyncJob
return;
}

await this.googleCalendarFullSyncService.startGoogleCalendarFullSync(
await this.googleCalendarSyncService.startGoogleCalendarSync(
data.workspaceId,
data.connectedAccountId,
data.nextPageToken,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ export class CalendarChannelRepository {
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}

public async getAll(
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<CalendarChannelObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."calendarChannel"`,
[],
workspaceId,
transactionManager,
);
}

public async getByConnectedAccountId(
connectedAccountId: string,
workspaceId: string,
Expand Down
Loading

0 comments on commit 0a6dc75

Please sign in to comment.