Skip to content

Commit

Permalink
fix: google drive: old files in new folders going undetected
Browse files Browse the repository at this point in the history
  • Loading branch information
amuwal committed Dec 17, 2024
1 parent 7ed61a1 commit aa41b83
Showing 1 changed file with 170 additions and 49 deletions.
219 changes: 170 additions & 49 deletions packages/api/src/filestorage/file/services/googledrive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import { GoogledrivePermissionOutput } from '@filestorage/permission/services/go

const BATCH_SIZE = 1000; // Number of files to process in each batch
const API_RATE_LIMIT = 10; // Requests per second
const MAX_RETRIES = 3;
const INITIAL_BACKOFF = 1000; // 1 second

@Injectable()
export class GoogleDriveService implements IFileService {
Expand Down Expand Up @@ -128,7 +130,11 @@ export class GoogleDriveService implements IFileService {
);
}

async sync(data: SyncParam, pageToken?: string) {
async sync(
data: SyncParam,
pageTokenFiles?: string,
pageTokenFolders?: string,
) {
const { linkedUserId, custom_field_mappings, ingestParams } = data;
const connection = await this.prisma.connections.findFirst({
where: {
Expand All @@ -155,41 +161,29 @@ export class GoogleDriveService implements IFileService {

let query =
"mimeType!='application/vnd.google-apps.folder' and trashed = false";
if (!pageToken) {
if (!pageTokenFiles && !pageTokenFolders) {
const lastSyncTime = await this.getLastSyncTime(connection.id_connection);
if (lastSyncTime) {
console.log(`Last sync time is ${lastSyncTime.toISOString()}`);
query += ` and modifiedTime >= '${lastSyncTime.toISOString()}'`;
}
}
// Fetch the current page of files
const response = await this.rateLimitedRequest(() =>
drive.files.list({
q: query,
fields:
'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions)',
pageSize: BATCH_SIZE,
pageToken: pageToken,
includeItemsFromAllDrives: true,
supportsAllDrives: true,
orderBy: 'modifiedTime',
}),
);

const files: GoogleDriveFileOutput[] = (response as any).data.files.map(
(file) => ({
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
createdTime: file.createdTime!,
modifiedTime: file.modifiedTime!,
size: file.size!,
permissions: file.permissions,
parents: file.parents,
webViewLink: file.webViewLink,
driveId: file.driveId || rootDriveId,
}),
);
const { filesToSync, nextPageTokenFiles, nextPageTokenFolders } =
await this.getFilesToSync(drive, query, pageTokenFiles, pageTokenFolders);

const files: GoogleDriveFileOutput[] = filesToSync.map((file) => ({
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
createdTime: file.createdTime!,
modifiedTime: file.modifiedTime!,
size: file.size!,
permissions: file.permissions,
parents: file.parents,
webViewLink: file.webViewLink,
driveId: file.driveId || rootDriveId,
}));

// Process the files fetched in the current batch
if (files.length > 0) {
Expand All @@ -201,16 +195,14 @@ export class GoogleDriveService implements IFileService {
);
}

// Get the next pageToken
const nextPageToken = (response as any).data.nextPageToken;

if (nextPageToken) {
if (nextPageTokenFiles || nextPageTokenFolders) {
// Add the next pageToken to the queue
await this.bullQueueService
.getThirdPartyDataIngestionQueue()
.add('fs_file_googledrive', {
...data,
pageToken: nextPageToken,
pageTokenFiles: nextPageTokenFiles,
pageTokenFolders: nextPageTokenFolders,
connectionId: connection.id_connection,
});
}
Expand All @@ -222,11 +214,99 @@ export class GoogleDriveService implements IFileService {
statusCode: 200,
};
}

async getFilesToSync(
drive: any,
query: string,
pageTokenFiles?: string,
pageTokenFolders?: string,
) {
const need_to_fetch_files =
pageTokenFiles || (!pageTokenFolders && !pageTokenFiles);

const need_to_fetch_folders =
pageTokenFolders || (!pageTokenFiles && !pageTokenFolders);

interface DriveResponse {
data: {
files: GoogleDriveFileOutput[];
nextPageToken?: string;
};
}

const filesResponse = need_to_fetch_files
? await this.rateLimitedRequest<DriveResponse>(() =>
drive.files.list({
q: query,
fields:
'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions)',
pageSize: BATCH_SIZE,
pageToken: pageTokenFiles,
includeItemsFromAllDrives: true,
supportsAllDrives: true,
orderBy: 'modifiedTime',
}),
)
: null;

const folderQuery = query.replace(
"mimeType!='application/vnd.google-apps.folder'",
"mimeType='application/vnd.google-apps.folder'",
);

const foldersResponse = need_to_fetch_folders
? await this.rateLimitedRequest<DriveResponse>(() =>
drive.files.list({
q: folderQuery,
fields:
'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions)',
pageSize: BATCH_SIZE,
pageToken: pageTokenFolders,
includeItemsFromAllDrives: true,
supportsAllDrives: true,
orderBy: 'modifiedTime',
}),
)
: null;

const filesFromNewFolders = foldersResponse
? await Promise.all(
foldersResponse.data.files.map((folder) =>
this.rateLimitedRequest<DriveResponse>(() =>
drive.files.list({
q: `'${folder.id}' in parents`,
fields:
'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions)',
pageSize: BATCH_SIZE,
}),
),
),
)
: null;

// Remove duplicate files based on id
const filesToSync = Array.from(
new Map(
[
...(filesResponse?.data.files || []),
...(filesFromNewFolders?.flatMap((folder) => folder.data.files) ||
[]),
].map((file) => [file.id, file]),
).values(),
);

const nextPageTokenFiles = filesResponse?.data.nextPageToken;
const nextPageTokenFolders = foldersResponse?.data.nextPageToken;

return { filesToSync, nextPageTokenFiles, nextPageTokenFolders };
}

async processBatch(job: any) {
const {
linkedUserId,
query,
pageToken,
pageTokenFiles,
pageTokenFolders,
connectionId,
custom_field_mappings,
ingestParams,
Expand All @@ -239,26 +319,56 @@ export class GoogleDriveService implements IFileService {
custom_field_mappings,
ingestParams,
},
pageToken,
pageTokenFiles,
pageTokenFolders,
);
}

private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(async () => {
try {
const result = await request();
resolve(result);
} catch (error) {
this.logger.error('Error in rateLimitedRequest:', error);
if (error.response) {
this.logger.error('Response data:', error.response.data);
this.logger.error('Response status:', error.response.status);
let attempt = 0;
let backoff = INITIAL_BACKOFF;

while (attempt <= MAX_RETRIES) {
try {
// Add base delay between requests
await new Promise((resolve) =>
setTimeout(resolve, 1000 / API_RATE_LIMIT),
);
return await request();
} catch (error) {
if (
isGoogleApiError(error) &&
(error.code === 429 || error.message.includes('quota'))
) {
if (attempt === MAX_RETRIES) {
this.logger.error(
'Max retries reached for Google API request.',
error.message,
);
throw new Error('Failed to complete request due to rate limits.');
}
reject(error);

this.logger.warn(
`Rate limit encountered. Retrying attempt ${
attempt + 1
} after ${backoff}ms.`,
);
await new Promise((resolve) => setTimeout(resolve, backoff));
backoff *= 2; // Exponential backoff
attempt += 1;
continue;
}
}, 1000 / API_RATE_LIMIT);
});

this.logger.error('Error in rateLimitedRequest:', error);
if (error.response) {
this.logger.error('Response data:', error.response.data);
this.logger.error('Response status:', error.response.status);
}
throw error;
}
}

throw new Error('Failed to complete request due to rate limits.');
}

private async getLastSyncTime(connectionId: string): Promise<Date | null> {
Expand Down Expand Up @@ -292,3 +402,14 @@ export class GoogleDriveService implements IFileService {
}
}
}

function isGoogleApiError(
error: unknown,
): error is { code: number; message: string } {
return (
typeof error === 'object' &&
error !== null &&
'code' in error &&
'message' in error
);
}

0 comments on commit aa41b83

Please sign in to comment.