From 234dbee68e85fadbdf95c4d07937b1cd8fed642f Mon Sep 17 00:00:00 2001 From: amit <1mitccc@gmail.com> Date: Wed, 18 Dec 2024 22:48:52 +0530 Subject: [PATCH] refactor: google drive: better handling of unresoved folders --- .../folder/services/googledrive/index.ts | 520 +++++++++--------- 1 file changed, 270 insertions(+), 250 deletions(-) diff --git a/packages/api/src/filestorage/folder/services/googledrive/index.ts b/packages/api/src/filestorage/folder/services/googledrive/index.ts index a2d419b11..3932aee7b 100644 --- a/packages/api/src/filestorage/folder/services/googledrive/index.ts +++ b/packages/api/src/filestorage/folder/services/googledrive/index.ts @@ -25,6 +25,9 @@ interface GoogleDriveListResponse { const GOOGLE_DRIVE_QUOTA_DELAY = 100; // ms between requests const MAX_RETRIES = 3; const INITIAL_BACKOFF = 1000; // 1 second +const RATE_LIMIT_DELAY = 100; // ms between requests to avoid quota issues +const MAX_API_RETRIES = 3; +const BASE_BACKOFF_MS = 1000; @Injectable() export class GoogleDriveFolderService implements IFolderService { @@ -100,24 +103,81 @@ export class GoogleDriveFolderService implements IFolderService { } } - async recursiveGetGoogleDriveFolders( + async sync(data: SyncParam): Promise> { + try { + const { linkedUserId } = data; + + const connection = await this.prisma.connections.findFirst({ + where: { + id_linked_user: linkedUserId, + provider_slug: 'googledrive', + vertical: 'filestorage', + }, + }); + + if (!connection) { + return { + data: [], + message: 'Connection not found', + statusCode: 404, + }; + } + + const auth = new OAuth2Client(); + auth.setCredentials({ + access_token: this.cryptoService.decrypt(connection.access_token), + }); + + const lastSyncTime = await this.getLastSyncTime(connection.id_connection); + + const folders = lastSyncTime + ? await this.getFoldersIncremental( + auth, + connection.id_connection, + lastSyncTime, + ) + : await this.recursiveGetGoogleDriveFolders( + auth, + connection.id_connection, + ); + + console.log(`Got ${folders.length} folders`); + + await this.ingestPermissionsForFolders(folders, connection.id_connection); + this.logger.log(`Synced ${folders.length} Google Drive folders!`); + + return { + data: folders, + message: 'Google Drive folders retrieved', + statusCode: 200, + }; + } catch (error) { + this.logger.error('Error syncing Google Drive folders', error); + console.log(error); + throw error; + } + } + + private async recursiveGetGoogleDriveFolders( auth: OAuth2Client, connectionId: string, ): Promise { const drive = google.drive({ version: 'v3', auth }); - const rootDriveId = await drive.files - .get({ - fileId: 'root', - fields: 'id', - }) - .then((res) => res.data.id); + const rootDriveId = await this.executeWithRetry(() => + drive.files + .get({ + fileId: 'root', + fields: 'id', + }) + .then((res) => res.data.id), + ); // Helper function to fetch folders for a specific parent ID or root - async function fetchFoldersForParent( + const fetchFoldersForParent = async ( parentId: string | null = null, driveId: string, - ): Promise { + ): Promise => { const folders: GoogleDriveFolderOutput[] = []; let pageToken: string | null = null; @@ -128,57 +188,23 @@ export class GoogleDriveFolderService implements IFolderService { : `${baseQuery} and '${driveId}' in parents`; }; - const executeWithRetry = async ( - pageToken: string | null, - retryCount = 0, - ): Promise => { - try { - await new Promise((resolve) => - setTimeout(resolve, GOOGLE_DRIVE_QUOTA_DELAY), - ); - - const resp = await drive.files.list({ - q: buildQuery(parentId, driveId), - fields: - 'nextPageToken, files(id, name, parents, createdTime, modifiedTime, driveId, webViewLink, permissions)', - pageToken, - includeItemsFromAllDrives: true, - supportsAllDrives: true, - orderBy: 'modifiedTime', - ...(driveId !== 'root' && { - driveId, - corpora: 'drive', - }), - }); - - return resp as unknown as GoogleDriveListResponse; - } catch (error) { - if (!isGoogleApiError(error)) { - throw error; - } - - const { code, message } = error; - - if (retryCount >= MAX_RETRIES) { - throw new Error( - `Failed to fetch Google Drive folders after ${MAX_RETRIES} retries. Last error: ${message}`, - ); - } - - // Handle rate limiting and quota errors - if (code === 429 || message.includes('quota')) { - const backoffTime = INITIAL_BACKOFF * Math.pow(2, retryCount); - await new Promise((resolve) => setTimeout(resolve, backoffTime)); - return executeWithRetry(pageToken, retryCount + 1); - } - - throw error; - } - }; - try { do { - const response = await executeWithRetry(pageToken); + const response = (await this.executeWithRetry(() => + drive.files.list({ + q: buildQuery(parentId, driveId), + fields: + 'nextPageToken, files(id, name, parents, createdTime, modifiedTime, driveId, webViewLink, permissions)', + pageToken, + includeItemsFromAllDrives: true, + supportsAllDrives: true, + orderBy: 'modifiedTime', + ...(driveId !== 'root' && { + driveId, + corpora: 'drive', + }), + }), + )) as unknown as GoogleDriveListResponse; if (response.data.files?.length) { folders.push(...response.data.files); @@ -198,7 +224,7 @@ export class GoogleDriveFolderService implements IFolderService { `Error fetching Google Drive folders: ${error.message}`, ); } - } + }; // Recursive function to populate folders level by level async function populateFolders( @@ -233,6 +259,7 @@ export class GoogleDriveFolderService implements IFolderService { ); } + // main logic try { const driveIds = await this.fetchDriveIds(auth); const googleDriveFolders: GoogleDriveFolderOutput[] = []; @@ -367,150 +394,187 @@ export class GoogleDriveFolderService implements IFolderService { connectionId: string, lastSyncTime: Date, ): Promise { - const MAX_RETRIES = 5; - const INITIAL_BACKOFF = 1000; // 1 second - try { const drive = google.drive({ version: 'v3', auth }); const driveIds = await this.fetchDriveIds(auth); - const modifiedFolders = await this.getModifiedFoldersWithRetry( - drive, - lastSyncTime, - MAX_RETRIES, - INITIAL_BACKOFF, + const modifiedFolders = await this.executeWithRetry(() => + this.getModifiedFolders(drive, lastSyncTime), ); + const unresolvedFolders = await this.getUnresolvedFolders(connectionId); - const folderIdToInternalIdMap = new Map(); - const foldersToSync: GoogleDriveFolderOutput[] = []; // output - let remainingFolders = modifiedFolders.concat(unresolvedFolders); + return await this.processFoldersWithParents( + modifiedFolders.concat(unresolvedFolders), + connectionId, + driveIds, + drive, + ); + } catch (error) { + this.logger.error('Error in incremental folder sync', error); + throw error; + } + } - // Create a cache for parent lookups to minimize DB queries - const parentLookupCache = new Map(); + private async processFoldersWithParents( + folders: GoogleDriveFolderOutput[], + connectionId: string, + driveIds: string[], + drive: any, + ): Promise { + const folderIdToInternalIdMap = new Map(); + const foldersToSync: GoogleDriveFolderOutput[] = []; + let remainingFolders = folders; + const parentLookupCache = new Map(); + + while (remainingFolders.length > 0) { + const foldersStillPending = []; + + for (const folder of remainingFolders) { + const parentId = folder.parents?.[0] || 'root'; + const internalParentId = await this.resolveParentId( + parentId, + folderIdToInternalIdMap, + driveIds, + connectionId, + parentLookupCache, + ); - async function getParentFromDb(parentId: string): Promise { - if (parentLookupCache.has(parentId)) { - return parentLookupCache.get(parentId); + if (internalParentId) { + const folder_internal_id = uuidv4(); + foldersToSync.push( + this.createFolderWithInternalIds( + folder, + internalParentId, + folder_internal_id, + ), + ); + folderIdToInternalIdMap.set(folder.id, folder_internal_id); + } else { + foldersStillPending.push(folder); } + } - const parent = await this.prisma.fs_folders.findFirst({ - where: { - remote_id: parentId, - id_connection: connectionId, - }, - select: { id_fs_folder: true }, - }); - - const result = parent?.id_fs_folder || null; - parentLookupCache.set(parentId, result); - return result; + if (this.isStuckInLoop(foldersStillPending, remainingFolders)) { + const remote_folders = new Map( + foldersToSync.map((folder) => [folder.id, folder]), + ); + await this.handleUnresolvedFolders( + foldersStillPending, + foldersToSync, + remote_folders, + parentLookupCache, + driveIds, + connectionId, + drive, + ); + break; } - while (remainingFolders.length > 0) { - const foldersStillPending: GoogleDriveFolderOutput[] = []; - - for (const folder of remainingFolders) { - const parentId = folder.parents?.[0] || 'root'; - let internalParentId: string | null = null; - - // Check in memory maps first - if (folderIdToInternalIdMap.has(parentId)) { - internalParentId = folderIdToInternalIdMap.get(parentId)!; - } else if (driveIds.includes(parentId) || parentId === 'root') { - internalParentId = 'root'; - } else { - // Only query DB if necessary - internalParentId = await getParentFromDb.call(this, parentId); - } + remainingFolders = foldersStillPending; + } - if (internalParentId) { - // Parent found - create internal ID and add to sync list - const folder_internal_id = uuidv4(); - foldersToSync.push({ - ...folder, - internal_parent_folder_id: - internalParentId === 'root' ? null : internalParentId, - internal_id: folder_internal_id, - }); - folderIdToInternalIdMap.set(folder.id, folder_internal_id); - } else { - // Parent not found - try again in next iteration - foldersStillPending.push(folder); - } - } + return foldersToSync; + } - // Check if we're stuck in a loop (no folders processed in this iteration) - if (foldersStillPending.length === remainingFolders.length) { - this.logger.warn( - `Marking ${foldersStillPending.length} unresolved folders as 'unresolved'. Will try again in next sync.`, - ); + private createFolderWithInternalIds( + folder: GoogleDriveFolderOutput, + internalParentId: string, + internalId: string, + ): GoogleDriveFolderOutput { + return { + ...folder, + internal_parent_folder_id: + internalParentId === 'root' ? null : internalParentId, + internal_id: internalId, + }; + } - const unresolvedFolders = foldersStillPending.map((folder) => ({ - ...folder, - internal_parent_folder_id: 'unresolved', - })); - foldersToSync.push(...unresolvedFolders); - } + private isStuckInLoop( + pending: GoogleDriveFolderOutput[], + remaining: GoogleDriveFolderOutput[], + ): boolean { + return pending.length === remaining.length; + } - remainingFolders = foldersStillPending; + private async handleUnresolvedFolders( + pending: GoogleDriveFolderOutput[], + output: GoogleDriveFolderOutput[], + remote_folders: Map, + parentLookupCache: Map, + driveIds: string[], + connectionId: string, + drive: any, + ): Promise { + this.logger.warn( + `Found ${pending.length} unresolved folders. Resolving them...`, + ); + async function getIntenalParentRecursive( + folder: GoogleDriveFolderOutput, + ): Promise { + const remote_parent_id = folder.parents?.[0] || 'root'; + const internal_parent_id = await this.resolveParentId( + remote_parent_id, + parentLookupCache, + driveIds, + connectionId, + parentLookupCache, + ); + if (internal_parent_id) { + return internal_parent_id; } - return foldersToSync; - } catch (error) { - this.logger.error('Error in incremental folder sync', error); - throw error; + return getIntenalParentRecursive( + remote_folders.get(remote_parent_id) || + (await this.executeWithRetry(() => + drive.files.get({ + fileId: remote_parent_id, + fields: 'parents', + }), + )), + ); } + + await Promise.all( + pending.map(async (folder) => { + const internal_parent_id = await getIntenalParentRecursive(folder); + output.push({ + ...folder, + internal_parent_folder_id: internal_parent_id, + }); + }), + ); } - /** - * Fetches modified folders with retry logic for handling rate limits. - */ - private async getModifiedFoldersWithRetry( - drive: any, - lastSyncTime: Date, - maxRetries: number, - initialBackoff: number, - ): Promise { - let attempt = 0; - let backoff = initialBackoff; + private async resolveParentId( + parentId: string, + idMap: Map, + driveIds: string[], + connectionId: string, + cache: Map, + ): Promise { + if (idMap.has(parentId)) { + return idMap.get(parentId)!; + } - while (attempt <= maxRetries) { - try { - return await this.getModifiedFolders(drive, lastSyncTime); - } catch (error) { - if ( - isGoogleApiError(error) && - (error.code === 429 || error.message.includes('quota')) - ) { - if (attempt === maxRetries) { - this.logger.error( - 'Max retries reached for fetching modified folders.', - error.message, - ); - throw new Error( - 'Failed to fetch modified folders due to rate limits.', - ); - } - this.logger.warn( - `Rate limit encountered. Retrying attempt ${ - attempt + 1 - } after ${backoff}ms.`, - error.message, - ); - await this.delay(backoff); - backoff *= 2; // Exponential backoff - attempt += 1; - } else { - this.logger.error( - 'Unexpected error while fetching modified folders.', - error, - ); - throw error; - } - } + if (driveIds.includes(parentId) || parentId === 'root') { + return 'root'; } - return []; + if (cache.has(parentId)) { + return cache.get(parentId); + } + + const parent = await this.prisma.fs_folders.findFirst({ + where: { + remote_id: parentId, + id_connection: connectionId, + }, + select: { id_fs_folder: true }, + }); + + const result = parent?.id_fs_folder || null; + cache.set(parentId, result); + return result; } /** @@ -550,7 +614,7 @@ export class GoogleDriveFolderService implements IFolderService { return folders; } - async getUnresolvedFolders( + private async getUnresolvedFolders( connectionId: string, ): Promise { // Get unresolved folders @@ -591,61 +655,6 @@ export class GoogleDriveFolderService implements IFolderService { .filter((folder): folder is GoogleDriveFolderOutput => folder !== null); } - async sync(data: SyncParam): Promise> { - try { - const { linkedUserId } = data; - - const connection = await this.prisma.connections.findFirst({ - where: { - id_linked_user: linkedUserId, - provider_slug: 'googledrive', - vertical: 'filestorage', - }, - }); - - if (!connection) { - return { - data: [], - message: 'Connection not found', - statusCode: 404, - }; - } - - const auth = new OAuth2Client(); - auth.setCredentials({ - access_token: this.cryptoService.decrypt(connection.access_token), - }); - - const lastSyncTime = await this.getLastSyncTime(connection.id_connection); - - const folders = lastSyncTime - ? await this.getFoldersIncremental( - auth, - connection.id_connection, - lastSyncTime, - ) - : await this.recursiveGetGoogleDriveFolders( - auth, - connection.id_connection, - ); - - console.log(`Got ${folders.length} folders`); - - await this.ingestPermissionsForFolders(folders, connection.id_connection); - this.logger.log(`Synced ${folders.length} Google Drive folders!`); - - return { - data: folders, - message: 'Google Drive folders retrieved', - statusCode: 200, - }; - } catch (error) { - this.logger.error('Error syncing Google Drive folders', error); - console.log(error); - throw error; - } - } - private async getLastSyncTime(connectionId: string): Promise { const lastSync = await this.prisma.fs_folders.findFirst({ where: { id_connection: connectionId }, @@ -654,29 +663,40 @@ export class GoogleDriveFolderService implements IFolderService { return lastSync ? lastSync.remote_modified_at : null; } - /** - * Type guard for Google API errors - */ - private isGoogleApiError( - error: unknown, - ): error is { code: number; message: string } { - return ( - typeof error === 'object' && - error !== null && - 'code' in error && - 'message' in error - ); + private async executeWithRetry( + operation: () => Promise, + retryCount = 0, + ): Promise { + try { + await this.delay(RATE_LIMIT_DELAY); + return await operation(); + } catch (error) { + if (!isRateLimitError(error)) { + throw error; + } + + if (retryCount >= MAX_API_RETRIES) { + throw new Error( + `Failed after ${MAX_API_RETRIES} retries. Last error: ${error.message}`, + ); + } + + const backoffTime = BASE_BACKOFF_MS * Math.pow(2, retryCount); + await this.delay(backoffTime); + return this.executeWithRetry(operation, retryCount + 1); + } } } -// Type guard for Google API errors -function isGoogleApiError( +function isRateLimitError( error: unknown, ): error is { code: number; message: string } { return ( typeof error === 'object' && error !== null && 'code' in error && - 'message' in error + 'message' in error && + (error.code === 429 || + (typeof error.message === 'string' && error.message.includes('quota'))) ); }