diff --git a/packages/api/src/filestorage/file/services/googledrive/index.ts b/packages/api/src/filestorage/file/services/googledrive/index.ts index 348138fa4..925dd79cb 100644 --- a/packages/api/src/filestorage/file/services/googledrive/index.ts +++ b/packages/api/src/filestorage/file/services/googledrive/index.ts @@ -18,6 +18,8 @@ import { GoogledrivePermissionOutput } from '@filestorage/permission/services/go const BATCH_SIZE = 1000; // Number of files to process in each batch const API_RATE_LIMIT = 10; // Requests per second +const MAX_RETRIES = 3; +const INITIAL_BACKOFF = 1000; // 1 second @Injectable() export class GoogleDriveService implements IFileService { @@ -128,7 +130,11 @@ export class GoogleDriveService implements IFileService { ); } - async sync(data: SyncParam, pageToken?: string) { + async sync( + data: SyncParam, + pageTokenFiles?: string, + pageTokenFolders?: string, + ) { const { linkedUserId, custom_field_mappings, ingestParams } = data; const connection = await this.prisma.connections.findFirst({ where: { @@ -155,41 +161,29 @@ export class GoogleDriveService implements IFileService { let query = "mimeType!='application/vnd.google-apps.folder' and trashed = false"; - if (!pageToken) { + if (!pageTokenFiles && !pageTokenFolders) { const lastSyncTime = await this.getLastSyncTime(connection.id_connection); if (lastSyncTime) { console.log(`Last sync time is ${lastSyncTime.toISOString()}`); query += ` and modifiedTime >= '${lastSyncTime.toISOString()}'`; } } - // Fetch the current page of files - const response = await this.rateLimitedRequest(() => - drive.files.list({ - q: query, - fields: - 'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions)', - pageSize: BATCH_SIZE, - pageToken: pageToken, - includeItemsFromAllDrives: true, - supportsAllDrives: true, - orderBy: 'modifiedTime', - }), - ); - const files: GoogleDriveFileOutput[] = (response as any).data.files.map( - (file) => ({ - id: file.id!, - name: file.name!, - mimeType: file.mimeType!, - createdTime: file.createdTime!, - modifiedTime: file.modifiedTime!, - size: file.size!, - permissions: file.permissions, - parents: file.parents, - webViewLink: file.webViewLink, - driveId: file.driveId || rootDriveId, - }), - ); + const { filesToSync, nextPageTokenFiles, nextPageTokenFolders } = + await this.getFilesToSync(drive, query, pageTokenFiles, pageTokenFolders); + + const files: GoogleDriveFileOutput[] = filesToSync.map((file) => ({ + id: file.id!, + name: file.name!, + mimeType: file.mimeType!, + createdTime: file.createdTime!, + modifiedTime: file.modifiedTime!, + size: file.size!, + permissions: file.permissions, + parents: file.parents, + webViewLink: file.webViewLink, + driveId: file.driveId || rootDriveId, + })); // Process the files fetched in the current batch if (files.length > 0) { @@ -201,16 +195,14 @@ export class GoogleDriveService implements IFileService { ); } - // Get the next pageToken - const nextPageToken = (response as any).data.nextPageToken; - - if (nextPageToken) { + if (nextPageTokenFiles || nextPageTokenFolders) { // Add the next pageToken to the queue await this.bullQueueService .getThirdPartyDataIngestionQueue() .add('fs_file_googledrive', { ...data, - pageToken: nextPageToken, + pageTokenFiles: nextPageTokenFiles, + pageTokenFolders: nextPageTokenFolders, connectionId: connection.id_connection, }); } @@ -222,11 +214,99 @@ export class GoogleDriveService implements IFileService { statusCode: 200, }; } + + async getFilesToSync( + drive: any, + query: string, + pageTokenFiles?: string, + pageTokenFolders?: string, + ) { + const need_to_fetch_files = + pageTokenFiles || (!pageTokenFolders && !pageTokenFiles); + + const need_to_fetch_folders = + pageTokenFolders || (!pageTokenFiles && !pageTokenFolders); + + interface DriveResponse { + data: { + files: GoogleDriveFileOutput[]; + nextPageToken?: string; + }; + } + + const filesResponse = need_to_fetch_files + ? await this.rateLimitedRequest(() => + drive.files.list({ + q: query, + fields: + 'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions)', + pageSize: BATCH_SIZE, + pageToken: pageTokenFiles, + includeItemsFromAllDrives: true, + supportsAllDrives: true, + orderBy: 'modifiedTime', + }), + ) + : null; + + const folderQuery = query.replace( + "mimeType!='application/vnd.google-apps.folder'", + "mimeType='application/vnd.google-apps.folder'", + ); + + const foldersResponse = need_to_fetch_folders + ? await this.rateLimitedRequest(() => + drive.files.list({ + q: folderQuery, + fields: + 'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions)', + pageSize: BATCH_SIZE, + pageToken: pageTokenFolders, + includeItemsFromAllDrives: true, + supportsAllDrives: true, + orderBy: 'modifiedTime', + }), + ) + : null; + + const filesFromNewFolders = foldersResponse + ? await Promise.all( + foldersResponse.data.files.map((folder) => + this.rateLimitedRequest(() => + drive.files.list({ + q: `'${folder.id}' in parents`, + fields: + 'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions)', + pageSize: BATCH_SIZE, + }), + ), + ), + ) + : null; + + // Remove duplicate files based on id + const filesToSync = Array.from( + new Map( + [ + ...(filesResponse?.data.files || []), + ...(filesFromNewFolders?.flatMap((folder) => folder.data.files) || + []), + ].map((file) => [file.id, file]), + ).values(), + ); + + const nextPageTokenFiles = filesResponse?.data.nextPageToken; + const nextPageTokenFolders = foldersResponse?.data.nextPageToken; + + return { filesToSync, nextPageTokenFiles, nextPageTokenFolders }; + } + async processBatch(job: any) { const { linkedUserId, query, - pageToken, + pageTokenFiles, + pageTokenFolders, connectionId, custom_field_mappings, ingestParams, @@ -239,26 +319,56 @@ export class GoogleDriveService implements IFileService { custom_field_mappings, ingestParams, }, - pageToken, + pageTokenFiles, + pageTokenFolders, ); } private async rateLimitedRequest(request: () => Promise): Promise { - return new Promise((resolve, reject) => { - setTimeout(async () => { - try { - const result = await request(); - resolve(result); - } catch (error) { - this.logger.error('Error in rateLimitedRequest:', error); - if (error.response) { - this.logger.error('Response data:', error.response.data); - this.logger.error('Response status:', error.response.status); + let attempt = 0; + let backoff = INITIAL_BACKOFF; + + while (attempt <= MAX_RETRIES) { + try { + // Add base delay between requests + await new Promise((resolve) => + setTimeout(resolve, 1000 / API_RATE_LIMIT), + ); + return await request(); + } catch (error) { + if ( + isGoogleApiError(error) && + (error.code === 429 || error.message.includes('quota')) + ) { + if (attempt === MAX_RETRIES) { + this.logger.error( + 'Max retries reached for Google API request.', + error.message, + ); + throw new Error('Failed to complete request due to rate limits.'); } - reject(error); + + this.logger.warn( + `Rate limit encountered. Retrying attempt ${ + attempt + 1 + } after ${backoff}ms.`, + ); + await new Promise((resolve) => setTimeout(resolve, backoff)); + backoff *= 2; // Exponential backoff + attempt += 1; + continue; } - }, 1000 / API_RATE_LIMIT); - }); + + this.logger.error('Error in rateLimitedRequest:', error); + if (error.response) { + this.logger.error('Response data:', error.response.data); + this.logger.error('Response status:', error.response.status); + } + throw error; + } + } + + throw new Error('Failed to complete request due to rate limits.'); } private async getLastSyncTime(connectionId: string): Promise { @@ -292,3 +402,14 @@ export class GoogleDriveService implements IFileService { } } } + +function isGoogleApiError( + error: unknown, +): error is { code: number; message: string } { + return ( + typeof error === 'object' && + error !== null && + 'code' in error && + 'message' in error + ); +}