diff --git a/packages/api/src/filestorage/file/services/googledrive/index.ts b/packages/api/src/filestorage/file/services/googledrive/index.ts index 8c19fa217..12f7de597 100644 --- a/packages/api/src/filestorage/file/services/googledrive/index.ts +++ b/packages/api/src/filestorage/file/services/googledrive/index.ts @@ -56,7 +56,7 @@ export class GoogleDriveService implements IFileService { ); } - async sync(data: SyncParam) { + async sync(data: SyncParam, pageToken?: string) { const { linkedUserId, custom_field_mappings, ingestParams } = data; const connection = await this.prisma.connections.findFirst({ where: { @@ -69,6 +69,7 @@ export class GoogleDriveService implements IFileService { if (!connection) return; const auth = new OAuth2Client(); + console.log("token is "+ JSON.stringify(this.cryptoService.decrypt(connection.access_token))); auth.setCredentials({ access_token: this.cryptoService.decrypt(connection.access_token), }); @@ -79,41 +80,58 @@ export class GoogleDriveService implements IFileService { ? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'` : 'trashed = false'; - let pageToken: string | undefined; - let count = 0; - do { - const response = await this.rateLimitedRequest(() => - drive.files.list({ - q: query, - fields: 'nextPageToken', - pageSize: BATCH_SIZE, - pageToken: pageToken, - }), + // Fetch the current page of files + const response = await this.rateLimitedRequest(() => + drive.files.list({ + q: query, + fields: + 'nextPageToken, files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', + pageSize: BATCH_SIZE, + pageToken: pageToken, + }), + ); + + const files: GoogleDriveFileOutput[] = (response as any).data.files.map((file) => ({ + id: file.id!, + name: file.name!, + mimeType: file.mimeType!, + modifiedTime: file.modifiedTime!, + size: file.size!, + parents: file.parents, + webViewLink: file.webViewLink, + })); + + // Process the files fetched in the current batch + if (files.length > 0) { + await this.ingestData( + files, + connection.id_connection, + custom_field_mappings, + ingestParams, ); + } - count++; + // Get the next pageToken + const nextPageToken = (response as any).data.nextPageToken; + if (nextPageToken) { + // Add the next pageToken to the queue await this.bullQueueService .getThirdPartyDataIngestionQueue() .add('fs_file_googledrive', { ...data, - pageToken: (response as any).data.nextPageToken, - query, + pageToken: nextPageToken, connectionId: connection.id_connection, - custom_field_mappings, - ingestParams, }); + } - pageToken = (response as any).data.nextPageToken; - } while (pageToken); - console.log(`it has been called ${count} times`) + console.log(`Processed a batch of ${files.length} files.`); return { data: [], - message: 'Google Drive sync completed', + message: 'Google Drive sync completed for this batch', statusCode: 200, }; } - async processBatch(job: any) { const { linkedUserId, @@ -123,56 +141,13 @@ export class GoogleDriveService implements IFileService { custom_field_mappings, ingestParams, } = job.data; - const connection = await this.prisma.connections.findFirst({ - where: { - id_linked_user: linkedUserId, - provider_slug: 'googledrive', - vertical: 'filestorage', - }, - }); - if (!connection) return; - - const auth = new OAuth2Client(); - auth.setCredentials({ - access_token: this.cryptoService.decrypt(connection.access_token), - }); - const drive = google.drive({ version: 'v3', auth }); - try { - const response = await this.rateLimitedRequest(() => - drive.files.list({ - q: query, - fields: - 'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', - pageSize: BATCH_SIZE, - pageToken: pageToken, - orderBy: 'modifiedTime', - }), - ); - - const files: GoogleDriveFileOutput[] = (response as any).data.files.map((file) => ({ - id: file.id!, - name: file.name!, - mimeType: file.mimeType!, - modifiedTime: file.modifiedTime!, - size: file.size!, - parents: file.parents, - webViewLink: file.webViewLink, - })); - - await this.ingestData( - files, - connectionId, - custom_field_mappings, - ingestParams, - ); - } catch (error) { - this.logger.error('Error in processBatch:', error); - if (error.message === 'Invalid Value') { - this.logger.error('This may be due to an expired or invalid access token.', error); - } - throw error; - } + // Call the sync method with the pageToken and other job data + await this.sync({ + linkedUserId, + custom_field_mappings, + ingestParams, + }, pageToken); } private async rateLimitedRequest(request: () => Promise): Promise { @@ -193,77 +168,6 @@ export class GoogleDriveService implements IFileService { }); } - /*async sync(data: SyncParam): Promise> { - try { - const { linkedUserId, id_folder } = data; - - const connection = await this.prisma.connections.findFirst({ - where: { - id_linked_user: linkedUserId, - provider_slug: 'googledrive', - vertical: 'filestorage', - }, - }); - - if (!connection) return; - - const auth = new OAuth2Client(); - auth.setCredentials({ - access_token: this.cryptoService.decrypt(connection.access_token), - }); - const drive = google.drive({ version: 'v3', auth }); - - const lastSyncTime = await this.getLastSyncTime(connection.id_connection); - console.log( - 'last updated time for google drive file is ' + - JSON.stringify(lastSyncTime), - ); - let pageToken: string | undefined; - let allFiles: GoogleDriveFileOutput[] = []; - - const query = lastSyncTime - ? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'` - : 'trashed = false'; - - do { - const response = await drive.files.list({ - q: query, - fields: - 'nextPageToken, files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', - pageSize: 1000, - pageToken: pageToken, - orderBy: 'modifiedTime', - }); - - const files: GoogleDriveFileOutput[] = response.data.files.map( - (file) => ({ - id: file.id!, - name: file.name!, - mimeType: file.mimeType!, - modifiedTime: file.modifiedTime!, - size: file.size!, - parents: file.parents, - webViewLink: file.webViewLink, - }), - ); - allFiles = allFiles.concat(files); - pageToken = response.data.nextPageToken; - if (pageToken) { - await sleep(100); // Wait 100ms between requests to avoid hitting rate limits - } - } while (pageToken); - this.logger.log(`Synced googledrive files !`); - - return { - data: allFiles, - message: 'Google Drive files retrieved', - statusCode: 200, - }; - } catch (error) { - throw error; - } - }*/ - private async getLastSyncTime(connectionId: string): Promise { const lastSync = await this.prisma.fs_files.findFirst({ where: { id_connection: connectionId }, diff --git a/packages/api/src/filestorage/file/services/googledrive/processor.ts b/packages/api/src/filestorage/file/services/googledrive/processor.ts index 2ae382e97..69c68e036 100644 --- a/packages/api/src/filestorage/file/services/googledrive/processor.ts +++ b/packages/api/src/filestorage/file/services/googledrive/processor.ts @@ -9,7 +9,7 @@ import { GoogleDriveService } from '.'; export class GoogleDriveQueueProcessor { constructor(private readonly googleDriveService: GoogleDriveService) {} - @Process('fs_file_googledrive') + @Process({ name: 'fs_file_googledrive', concurrency: 1 }) async handleGoogleDriveSync(job: Job) { try { await this.googleDriveService.processBatch(job);