From 3fa218d0aebbed558fafc1c2c8b8795e06dbb941 Mon Sep 17 00:00:00 2001 From: amit <1mitccc@gmail.com> Date: Fri, 20 Dec 2024 05:41:08 +0530 Subject: [PATCH] feat: onedrive: incremental sync for files --- .../api/src/filestorage/file/file.module.ts | 2 + .../file/services/onedrive/index.ts | 237 +++++++++++++++--- .../file/services/onedrive/processor.ts | 21 ++ .../folder/services/onedrive/index.ts | 5 + 4 files changed, 229 insertions(+), 36 deletions(-) create mode 100644 packages/api/src/filestorage/file/services/onedrive/processor.ts diff --git a/packages/api/src/filestorage/file/file.module.ts b/packages/api/src/filestorage/file/file.module.ts index cba9fdc26..0c3c8c6f9 100644 --- a/packages/api/src/filestorage/file/file.module.ts +++ b/packages/api/src/filestorage/file/file.module.ts @@ -18,6 +18,7 @@ import { SharepointFileMapper } from './services/sharepoint/mappers'; import { SyncService } from './sync/sync.service'; import { GoogleDriveQueueProcessor } from './services/googledrive/processor'; import { FolderModule } from '../folder/folder.module'; +import { OnedriveQueueProcessor } from './services/onedrive/processor'; @Module({ imports: [FolderModule], @@ -43,6 +44,7 @@ import { FolderModule } from '../folder/folder.module'; DropboxFileMapper, GoogleDriveService, GoogleDriveQueueProcessor, + OnedriveQueueProcessor, ], exports: [SyncService, ServiceRegistry], }) diff --git a/packages/api/src/filestorage/file/services/onedrive/index.ts b/packages/api/src/filestorage/file/services/onedrive/index.ts index b97117466..4b26c7876 100644 --- a/packages/api/src/filestorage/file/services/onedrive/index.ts +++ b/packages/api/src/filestorage/file/services/onedrive/index.ts @@ -10,6 +10,10 @@ import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'; import { ServiceRegistry } from '../registry.service'; import { OnedriveFileOutput } from './types'; import { OnedriveService as OnedriveFolderService } from '@filestorage/folder/services/onedrive'; +import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; +import { Connection } from '@@core/connections/@utils/types'; +import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified'; +import { BullQueueService } from '@@core/@core-services/queues/shared.service'; @Injectable() export class OnedriveService implements IFileService { private readonly MAX_RETRIES: number = 6; @@ -21,7 +25,9 @@ export class OnedriveService implements IFileService { private logger: LoggerService, private cryptoService: EncryptionService, private registry: ServiceRegistry, + private ingestService: IngestDataService, private onedriveFolderService: OnedriveFolderService, + private bullQueueService: BullQueueService, ) { this.logger.setContext( `${FileStorageObject.file.toUpperCase()}:${OnedriveService.name}`, @@ -31,9 +37,13 @@ export class OnedriveService implements IFileService { // todo: add addFile method - async sync(data: SyncParam): Promise> { + async sync( + data: SyncParam, + deltaLink?: string, + ): Promise> { try { - const { linkedUserId, id_folder } = data; + const { linkedUserId, custom_field_mappings, ingestParams, id_folder } = + data; const connection = await this.prisma.connections.findFirst({ where: { @@ -43,57 +53,103 @@ export class OnedriveService implements IFileService { }, }); - const foldersToSync = ['root']; + // if id_folder is provided, sync only the files in the specified folder if (id_folder) { const folder = await this.prisma.fs_folders.findUnique({ where: { id_fs_folder: id_folder as string, }, - }); - if (folder && folder.remote_id !== 'root') { - foldersToSync.push(folder.remote_id); - } - } else { - const folders = await this.prisma.fs_folders.findMany({ - where: { - id_connection: connection.id_connection, - remote_was_deleted: false, - }, select: { remote_id: true, }, }); - foldersToSync.push(...folders.map((folder) => folder.remote_id)); + if (folder) { + const files = await this.syncFolder(connection, folder.remote_id); + return { + data: files, + message: 'OneDrive files retrieved from specified folder', + statusCode: 200, + }; + } } - const allFiles: OnedriveFileOutput[] = []; - - // Process folders in batches - for (let i = 0; i < foldersToSync.length; i += this.BATCH_SIZE) { - const batch = foldersToSync.slice(i, i + this.BATCH_SIZE); + // if deltaLink is provided + if (deltaLink) { + this.logger.log( + `Syncing OneDrive files from deltaLink: ${deltaLink}`, + 'onedrive files sync', + ); - // Synchronize all folders in the current batch concurrently - const filePromises = batch.map(async (folderId) => { - const files = await this.syncFolder(connection, folderId); - return files; - }); + let files: OnedriveFileOutput[] = []; + let nextDeltaLink: string | null = null; + try { + const { files: batchFiles, nextDeltaLink: batchNextDeltaLink } = + await this.getFilesToSync(connection, deltaLink, 10); + + files = batchFiles; + nextDeltaLink = batchNextDeltaLink; + } catch (error: any) { + if (error.response?.status === 410) { + // Delta token expired, start fresh sync + const newDeltaLink = `${connection.account_url}/v1.0/me/drive/root/delta?$top=1000`; + return this.sync(data, newDeltaLink); + } + await this.bullQueueService + .getSyncJobsQueue() + .add('fs_file_onedrive', { + ...data, + deltaLink: deltaLink, + connectionId: connection.id_connection, + }); + + this.logger.error( + `Got 410 error while syncing OneDrive files. Added sync from /delta endpoint to queue to retry.`, + error, + ); + } - // Await all promises for the current batch - const batchFiles = await Promise.all(filePromises); - const flatBatchFiles = batchFiles.flat(); - allFiles.push(...flatBatchFiles); + if (files.length > 0) { + const ingestedFiles = await this.ingestFiles( + files, + connection, + custom_field_mappings, + ingestParams, + ); + this.logger.log( + `Ingested ${ingestedFiles.length} files from OneDrive.`, + 'onedrive files ingestion', + ); + } - this.logger.log( - `Batch ${i / this.BATCH_SIZE + 1} completed: got ${ - flatBatchFiles.length - } files.`, - ); + // more files to sync + if (nextDeltaLink) { + await this.bullQueueService + .getThirdPartyDataIngestionQueue() + .add('fs_file_onedrive', { + ...data, + deltaLink: nextDeltaLink, + connectionId: connection.id_connection, + }); + } else { + this.logger.log( + `No more files to sync from OneDrive.`, + 'onedrive files sync', + ); + } + } else { + const lastSyncTime = await this.getLastSyncTime(connection); + const deltaLink = lastSyncTime + ? `${ + connection.account_url + }/v1.0/me/drive/root/delta?$top=1000&token=${lastSyncTime.toISOString()}` + : `${connection.account_url}/v1.0/me/drive/root/delta?$top=1000`; // if no last sync time, get all files + + await this.sync(data, deltaLink); } - this.logger.log(`Synced ${allFiles.length} files for onedrive.`); return { - data: allFiles, - message: "OneDrive's files retrieved from root and specified folder", + data: [], + message: 'OneDrive files retrieved', statusCode: 200, }; } catch (error) { @@ -105,6 +161,115 @@ export class OnedriveService implements IFileService { throw error; } } + + private async getFilesToSync( + connection: Connection, + deltaLink: string, + batchSize: number, // number of times to call the API + ) { + const files: OnedriveFileOutput[] = []; + let nextDeltaLink: string | null = deltaLink; + + for (let i = 0; i < batchSize; i++) { + const resp = await this.makeRequestWithRetry({ + timeout: 30000, + method: 'get', + url: deltaLink, + headers: { + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }); + + const batchFiles = resp.data.value?.filter((elem: any) => !elem.folder); + files.push(...batchFiles); + nextDeltaLink = resp.data['@odata.nextLink']; + + if (!resp.data.value?.length) { + nextDeltaLink = null; + break; + } + } + + return { files, nextDeltaLink }; + } + + async processBatch(job: any) { + const { + linkedUserId, + deltaLink, + connectionId, + custom_field_mappings, + ingestParams, + } = job.data; + + // Call the sync method with the pageToken and other job data + await this.sync( + { + linkedUserId, + custom_field_mappings, + ingestParams, + }, + deltaLink, + ); + } + + private async ingestFiles( + files: OnedriveFileOutput[], + connection: Connection, + customFieldMappings?: { + slug: string; + remote_id: string; + }[], + extraParams?: { [key: string]: any }, + ) { + // Sort files by lastModifiedDateTime in descending order (newest first) + const sortedFiles = [...files].sort((a, b) => { + const dateA = new Date(a.lastModifiedDateTime).getTime(); + const dateB = new Date(b.lastModifiedDateTime).getTime(); + return dateB - dateA; + }); + + // Deduplicate files by remote_id, keeping only the first occurrence (which will be the latest version) + const uniqueFiles = sortedFiles.reduce((acc, file) => { + if (!acc.has(file.id)) { + acc.set(file.id, file); + } + return acc; + }, new Map()); + + this.logger.log( + `Deduplicating ${files.length} delta files to ${uniqueFiles.size} unique files`, + 'onedrive files ingestion', + ); + + return this.ingestService.ingestData< + UnifiedFilestorageFileOutput, + OnedriveFileOutput + >( + Array.from(uniqueFiles.values()), + 'onedrive', + connection.id_connection, + 'filestorage', + 'file', + customFieldMappings, + extraParams, + ); + } + + private async getLastSyncTime(connection: any) { + const lastSyncTime = await this.prisma.fs_files.findFirst({ + where: { + id_connection: connection.id_connection, + }, + orderBy: { + remote_modified_at: 'desc', + }, + }); + return lastSyncTime?.remote_modified_at; + } + private async syncFolder( connection: any, folderId: string, diff --git a/packages/api/src/filestorage/file/services/onedrive/processor.ts b/packages/api/src/filestorage/file/services/onedrive/processor.ts new file mode 100644 index 000000000..b15316a75 --- /dev/null +++ b/packages/api/src/filestorage/file/services/onedrive/processor.ts @@ -0,0 +1,21 @@ +import { Process, Processor } from '@nestjs/bull'; +import { Injectable } from '@nestjs/common'; +import { Job } from 'bull'; +import { Queues } from '@@core/@core-services/queues/types'; +import { OnedriveService } from '.'; + +@Injectable() +@Processor(Queues.THIRD_PARTY_DATA_INGESTION) +export class OnedriveQueueProcessor { + constructor(private readonly onedriveService: OnedriveService) {} + + @Process({ name: 'fs_file_onedrive', concurrency: 1 }) + async handleOneDriveSync(job: Job): Promise { + try { + await this.onedriveService.processBatch(job); + } catch (error) { + console.error(`Failed to process OneDrive sync job: ${error.message}`); + throw error; + } + } +} diff --git a/packages/api/src/filestorage/folder/services/onedrive/index.ts b/packages/api/src/filestorage/folder/services/onedrive/index.ts index 8ca9a2545..c029f3ecf 100644 --- a/packages/api/src/filestorage/folder/services/onedrive/index.ts +++ b/packages/api/src/filestorage/folder/services/onedrive/index.ts @@ -784,6 +784,11 @@ export class OnedriveService implements IFolderService { continue; } + // handle 410 gone errors + if (error.response?.status === 410 && config.url.includes('delta')) { + // todo: handle 410 gone errors + } + throw error; } }