-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🚑 Scaling fetching datta from gdrive #710
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,8 @@ export class BullQueueService { | |
public readonly syncJobsQueue: Queue, | ||
@InjectQueue(Queues.FAILED_PASSTHROUGH_REQUESTS_HANDLER) | ||
public readonly failedPassthroughRequestsQueue: Queue, | ||
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION) | ||
public readonly thirdPartyDataIngestionQueue: Queue, | ||
Comment on lines
+17
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM! Consider adding a comment for the new queue. The new queue injection for Consider adding a brief comment explaining the purpose of this new queue, similar to: /**
* Queue for handling third-party data ingestion tasks
*/
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION)
public readonly thirdPartyDataIngestionQueue: Queue, This would improve code documentation and make it easier for other developers to understand the queue's purpose at a glance. |
||
@InjectQueue(Queues.RAG_DOCUMENT_PROCESSING) | ||
private ragDocumentQueue: Queue, | ||
) {} | ||
|
@@ -35,6 +37,9 @@ export class BullQueueService { | |
getRagDocumentQueue() { | ||
return this.ragDocumentQueue; | ||
} | ||
getThirdPartyDataIngestionQueue() { | ||
return this.thirdPartyDataIngestionQueue; | ||
} | ||
|
||
async removeRepeatableJob(jobName: string) { | ||
const jobs = await this.syncJobsQueue.getRepeatableJobs(); | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,4 +4,5 @@ export enum Queues { | |||||
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties | ||||||
FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff | ||||||
RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING', | ||||||
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM! Consider adding a descriptive comment. The addition of Consider adding a descriptive comment for this new queue type, similar to the other entries. For example: - THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
+ THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', // Queue for ingesting data from third-party sources like Google Drive 📝 Committable suggestion
Suggested change
|
||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,7 @@ | |
private ragService: RagService, | ||
) {} | ||
|
||
async syncForLinkedUser<T, U, V extends IBaseObjectService>( | ||
integrationId: string, | ||
linkedUserId: string, | ||
vertical: string, | ||
|
@@ -85,10 +85,16 @@ | |
.filter((p) => p.shouldPassToService) | ||
.map((p) => p.param); | ||
|
||
const ingestParams = params | ||
.filter((p) => p.shouldPassToIngest) | ||
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); | ||
Comment on lines
+88
to
+90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider optimizing the The current implementation using Consider refactoring to use const ingestParams = Object.fromEntries(
params
.filter((p) => p.shouldPassToIngest)
.map((p) => [p.paramName, p.param])
); This approach avoids the spread operator in the accumulator, resulting in O(n) time complexity. 🧰 Tools🪛 Biome
|
||
|
||
// Construct the syncParam object dynamically | ||
const syncParam: SyncParam = { | ||
linkedUserId, | ||
custom_properties: remoteProperties, | ||
custom_field_mappings: customFieldMappings, | ||
ingestParams: ingestParams, | ||
}; | ||
|
||
serviceParams.forEach((param, index) => { | ||
|
@@ -124,11 +130,7 @@ | |
return; | ||
} | ||
|
||
const sourceObject: U[] = resp.data; | ||
|
||
const ingestParams = params | ||
.filter((p) => p.shouldPassToIngest) | ||
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); | ||
/*const sourceObject: U[] = resp.data; | ||
|
||
await this.ingestData<T, U>( | ||
sourceObject, | ||
|
@@ -138,7 +140,7 @@ | |
commonObject, | ||
customFieldMappings, | ||
ingestParams, | ||
); | ||
);*/ | ||
} catch (syncError) { | ||
this.logger.error( | ||
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`, | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,3 @@ | ||||||||||||||||||||||||
export function sleep(ms: number): Promise<void> { | ||||||||||||||||||||||||
return new Promise((resolve) => setTimeout(resolve, ms)); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
Comment on lines
+1
to
+3
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM! Consider adding a brief JSDoc comment. The Consider adding a brief JSDoc comment to improve documentation: +/**
+ * Pauses execution for the specified number of milliseconds.
+ * @param ms The number of milliseconds to sleep.
+ * @returns A Promise that resolves after the specified delay.
+ */
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
} 📝 Committable suggestion
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,8 +68,22 @@ export interface IBaseSync { | |
|
||
export type SyncParam = { | ||
linkedUserId: string; | ||
custom_field_mappings?: { | ||
slug: string; | ||
remote_id: string; | ||
}[]; | ||
ingestParams: { [key: string]: any }; | ||
Comment on lines
+71
to
+75
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM with a minor suggestion The additions to the Consider making ingestParams?: { [key: string]: any }; This change would maintain flexibility while not requiring empty objects to be passed when no extra parameters are necessary. |
||
[key: string]: any; | ||
}; | ||
export interface IBaseObjectService { | ||
sync(data: SyncParam): Promise<ApiResponse<any>>; | ||
ingestData( | ||
sourceData: any[], | ||
connectionId: string, | ||
customFieldMappings?: { | ||
slug: string; | ||
remote_id: string; | ||
}[], | ||
extraParams?: { [key: string]: any }, | ||
): Promise<any[]>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Approved with suggestions for improvement The addition of the Consider the following improvements:
ingestData<T = any>(
sourceData: T[],
connectionId: string,
customFieldMappings?: { slug: string; remote_id: string }[],
extraParams?: { [key: string]: any }
): Promise<T[]>;
These changes would improve type safety and maintain consistency across the codebase. |
||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,32 +1,180 @@ | ||||||||||||||||||||||||||||||||||||||||||
import { EncryptionService } from '@@core/@core-services/encryption/encryption.service'; | ||||||||||||||||||||||||||||||||||||||||||
import { LoggerService } from '@@core/@core-services/logger/logger.service'; | ||||||||||||||||||||||||||||||||||||||||||
import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; | ||||||||||||||||||||||||||||||||||||||||||
import { ApiResponse } from '@@core/utils/types'; | ||||||||||||||||||||||||||||||||||||||||||
import { BullQueueService } from '@@core/@core-services/queues/shared.service'; | ||||||||||||||||||||||||||||||||||||||||||
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; | ||||||||||||||||||||||||||||||||||||||||||
import { SyncParam } from '@@core/utils/types/interface'; | ||||||||||||||||||||||||||||||||||||||||||
import { FileStorageObject } from '@filestorage/@lib/@types'; | ||||||||||||||||||||||||||||||||||||||||||
import { IFileService } from '@filestorage/file/types'; | ||||||||||||||||||||||||||||||||||||||||||
import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified'; | ||||||||||||||||||||||||||||||||||||||||||
import { Injectable } from '@nestjs/common'; | ||||||||||||||||||||||||||||||||||||||||||
import axios from 'axios'; | ||||||||||||||||||||||||||||||||||||||||||
import { OAuth2Client } from 'google-auth-library'; | ||||||||||||||||||||||||||||||||||||||||||
import { google } from 'googleapis'; | ||||||||||||||||||||||||||||||||||||||||||
import { ServiceRegistry } from '../registry.service'; | ||||||||||||||||||||||||||||||||||||||||||
import { GoogleDriveFileOutput } from './types'; | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const BATCH_SIZE = 1000; // Number of files to process in each batch | ||||||||||||||||||||||||||||||||||||||||||
const API_RATE_LIMIT = 10; // Requests per second | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
@Injectable() | ||||||||||||||||||||||||||||||||||||||||||
export class GoogleDriveService implements IFileService { | ||||||||||||||||||||||||||||||||||||||||||
constructor( | ||||||||||||||||||||||||||||||||||||||||||
private prisma: PrismaService, | ||||||||||||||||||||||||||||||||||||||||||
private logger: LoggerService, | ||||||||||||||||||||||||||||||||||||||||||
private cryptoService: EncryptionService, | ||||||||||||||||||||||||||||||||||||||||||
private registry: ServiceRegistry, | ||||||||||||||||||||||||||||||||||||||||||
private ingestService: IngestDataService, | ||||||||||||||||||||||||||||||||||||||||||
private bullQueueService: BullQueueService, | ||||||||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||||||||
this.logger.setContext( | ||||||||||||||||||||||||||||||||||||||||||
FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name, | ||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Use template literals instead of string concatenation The current string concatenation can be replaced with a template literal for improved readability and consistency. Apply this diff to update the code: - this.logger.setContext(
- FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name,
- );
+ this.logger.setContext(
+ `${FileStorageObject.file.toUpperCase()}:${GoogleDriveService.name}`,
+ ); 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Biome
|
||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||
this.registry.registerService('googledrive', this); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
async sync(data: SyncParam): Promise<ApiResponse<GoogleDriveFileOutput[]>> { | ||||||||||||||||||||||||||||||||||||||||||
async ingestData( | ||||||||||||||||||||||||||||||||||||||||||
sourceData: GoogleDriveFileOutput[], | ||||||||||||||||||||||||||||||||||||||||||
connectionId: string, | ||||||||||||||||||||||||||||||||||||||||||
customFieldMappings?: { | ||||||||||||||||||||||||||||||||||||||||||
slug: string; | ||||||||||||||||||||||||||||||||||||||||||
remote_id: string; | ||||||||||||||||||||||||||||||||||||||||||
}[], | ||||||||||||||||||||||||||||||||||||||||||
extraParams?: { [key: string]: any }, | ||||||||||||||||||||||||||||||||||||||||||
): Promise<UnifiedFilestorageFileOutput[]> { | ||||||||||||||||||||||||||||||||||||||||||
return this.ingestService.ingestData< | ||||||||||||||||||||||||||||||||||||||||||
UnifiedFilestorageFileOutput, | ||||||||||||||||||||||||||||||||||||||||||
GoogleDriveFileOutput | ||||||||||||||||||||||||||||||||||||||||||
>( | ||||||||||||||||||||||||||||||||||||||||||
sourceData, | ||||||||||||||||||||||||||||||||||||||||||
'googledrive', | ||||||||||||||||||||||||||||||||||||||||||
connectionId, | ||||||||||||||||||||||||||||||||||||||||||
'filestorage', | ||||||||||||||||||||||||||||||||||||||||||
'file', | ||||||||||||||||||||||||||||||||||||||||||
customFieldMappings, | ||||||||||||||||||||||||||||||||||||||||||
extraParams, | ||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
async sync(data: SyncParam) { | ||||||||||||||||||||||||||||||||||||||||||
const { linkedUserId, custom_field_mappings, ingestParams } = data; | ||||||||||||||||||||||||||||||||||||||||||
const connection = await this.prisma.connections.findFirst({ | ||||||||||||||||||||||||||||||||||||||||||
where: { | ||||||||||||||||||||||||||||||||||||||||||
id_linked_user: linkedUserId, | ||||||||||||||||||||||||||||||||||||||||||
provider_slug: 'googledrive', | ||||||||||||||||||||||||||||||||||||||||||
vertical: 'filestorage', | ||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
if (!connection) return; | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const auth = new OAuth2Client(); | ||||||||||||||||||||||||||||||||||||||||||
auth.setCredentials({ | ||||||||||||||||||||||||||||||||||||||||||
access_token: this.cryptoService.decrypt(connection.access_token), | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
const drive = google.drive({ version: 'v3', auth }); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const lastSyncTime = await this.getLastSyncTime(connection.id_connection); | ||||||||||||||||||||||||||||||||||||||||||
const query = lastSyncTime | ||||||||||||||||||||||||||||||||||||||||||
? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'` | ||||||||||||||||||||||||||||||||||||||||||
: 'trashed = false'; | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
let pageToken: string | undefined; | ||||||||||||||||||||||||||||||||||||||||||
do { | ||||||||||||||||||||||||||||||||||||||||||
const response = await this.rateLimitedRequest(() => | ||||||||||||||||||||||||||||||||||||||||||
drive.files.list({ | ||||||||||||||||||||||||||||||||||||||||||
q: query, | ||||||||||||||||||||||||||||||||||||||||||
fields: 'nextPageToken', | ||||||||||||||||||||||||||||||||||||||||||
pageSize: BATCH_SIZE, | ||||||||||||||||||||||||||||||||||||||||||
pageToken: pageToken, | ||||||||||||||||||||||||||||||||||||||||||
}), | ||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
await this.bullQueueService | ||||||||||||||||||||||||||||||||||||||||||
.getThirdPartyDataIngestionQueue() | ||||||||||||||||||||||||||||||||||||||||||
.add('fs_file_googledrive', { | ||||||||||||||||||||||||||||||||||||||||||
...data, | ||||||||||||||||||||||||||||||||||||||||||
pageToken: response.data.nextPageToken, | ||||||||||||||||||||||||||||||||||||||||||
query, | ||||||||||||||||||||||||||||||||||||||||||
connectionId: connection.id_connection, | ||||||||||||||||||||||||||||||||||||||||||
custom_field_mappings, | ||||||||||||||||||||||||||||||||||||||||||
ingestParams, | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
pageToken = response.data.nextPageToken; | ||||||||||||||||||||||||||||||||||||||||||
} while (pageToken); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
return { | ||||||||||||||||||||||||||||||||||||||||||
data: [], | ||||||||||||||||||||||||||||||||||||||||||
message: 'Google Drive sync completed', | ||||||||||||||||||||||||||||||||||||||||||
statusCode: 200, | ||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
async processBatch(job: any) { | ||||||||||||||||||||||||||||||||||||||||||
const { | ||||||||||||||||||||||||||||||||||||||||||
linkedUserId, | ||||||||||||||||||||||||||||||||||||||||||
query, | ||||||||||||||||||||||||||||||||||||||||||
pageToken, | ||||||||||||||||||||||||||||||||||||||||||
connectionId, | ||||||||||||||||||||||||||||||||||||||||||
custom_field_mappings, | ||||||||||||||||||||||||||||||||||||||||||
ingestParams, | ||||||||||||||||||||||||||||||||||||||||||
} = job.data; | ||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+117
to
+125
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Define a specific type for the Using Consider defining an interface for the job data: interface GoogleDriveJobData {
linkedUserId: string;
query: string;
pageToken?: string;
connectionId: string;
custom_field_mappings?: { slug: string; remote_id: string }[];
ingestParams?: { [key: string]: any };
} Then update the method signature: - async processBatch(job: any) {
+ async processBatch(job: { data: GoogleDriveJobData }) { |
||||||||||||||||||||||||||||||||||||||||||
const connection = await this.prisma.connections.findFirst({ | ||||||||||||||||||||||||||||||||||||||||||
where: { | ||||||||||||||||||||||||||||||||||||||||||
id_linked_user: linkedUserId, | ||||||||||||||||||||||||||||||||||||||||||
provider_slug: 'googledrive', | ||||||||||||||||||||||||||||||||||||||||||
vertical: 'filestorage', | ||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
if (!connection) return; | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const auth = new OAuth2Client(); | ||||||||||||||||||||||||||||||||||||||||||
auth.setCredentials({ | ||||||||||||||||||||||||||||||||||||||||||
access_token: this.cryptoService.decrypt(connection.access_token), | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
const drive = google.drive({ version: 'v3', auth }); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const response = await this.rateLimitedRequest(() => | ||||||||||||||||||||||||||||||||||||||||||
drive.files.list({ | ||||||||||||||||||||||||||||||||||||||||||
q: query, | ||||||||||||||||||||||||||||||||||||||||||
fields: | ||||||||||||||||||||||||||||||||||||||||||
'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', | ||||||||||||||||||||||||||||||||||||||||||
pageSize: BATCH_SIZE, | ||||||||||||||||||||||||||||||||||||||||||
pageToken: pageToken, | ||||||||||||||||||||||||||||||||||||||||||
orderBy: 'modifiedTime', | ||||||||||||||||||||||||||||||||||||||||||
}), | ||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const files: GoogleDriveFileOutput[] = response.data.files.map((file) => ({ | ||||||||||||||||||||||||||||||||||||||||||
id: file.id!, | ||||||||||||||||||||||||||||||||||||||||||
name: file.name!, | ||||||||||||||||||||||||||||||||||||||||||
mimeType: file.mimeType!, | ||||||||||||||||||||||||||||||||||||||||||
modifiedTime: file.modifiedTime!, | ||||||||||||||||||||||||||||||||||||||||||
size: file.size!, | ||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid using non-null assertions Using the non-null assertion operator Consider handling the potential Apply this diff to remove the non-null assertions and handle potential - id: file.id!,
+ id: file.id ?? '',
- name: file.name!,
+ name: file.name ?? '',
- mimeType: file.mimeType!,
+ mimeType: file.mimeType ?? '',
- modifiedTime: file.modifiedTime!,
+ modifiedTime: file.modifiedTime ?? '',
- size: file.size!,
+ size: file.size ?? '0', 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Biome
|
||||||||||||||||||||||||||||||||||||||||||
parents: file.parents, | ||||||||||||||||||||||||||||||||||||||||||
webViewLink: file.webViewLink, | ||||||||||||||||||||||||||||||||||||||||||
})); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
await this.ingestData( | ||||||||||||||||||||||||||||||||||||||||||
files, | ||||||||||||||||||||||||||||||||||||||||||
connectionId, | ||||||||||||||||||||||||||||||||||||||||||
custom_field_mappings, | ||||||||||||||||||||||||||||||||||||||||||
ingestParams, | ||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> { | ||||||||||||||||||||||||||||||||||||||||||
return new Promise((resolve) => { | ||||||||||||||||||||||||||||||||||||||||||
setTimeout(async () => { | ||||||||||||||||||||||||||||||||||||||||||
const result = await request(); | ||||||||||||||||||||||||||||||||||||||||||
resolve(result); | ||||||||||||||||||||||||||||||||||||||||||
}, 1000 / API_RATE_LIMIT); | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle errors in If the Wrap the Apply this diff to handle errors appropriately: private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(async () => {
+ try {
const result = await request();
resolve(result);
+ } catch (error) {
+ reject(error);
+ }
}, 1000 / API_RATE_LIMIT);
});
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
/*async sync(data: SyncParam): Promise<ApiResponse<GoogleDriveFileOutput[]>> { | ||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||
const { linkedUserId, id_folder } = data; | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
|
@@ -46,34 +194,63 @@ export class GoogleDriveService implements IFileService { | |||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
const drive = google.drive({ version: 'v3', auth }); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const response = await drive.files.list({ | ||||||||||||||||||||||||||||||||||||||||||
q: 'trashed = false', | ||||||||||||||||||||||||||||||||||||||||||
fields: | ||||||||||||||||||||||||||||||||||||||||||
'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', | ||||||||||||||||||||||||||||||||||||||||||
pageSize: 1000, // Adjust as needed | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const files: GoogleDriveFileOutput[] = response.data.files.map( | ||||||||||||||||||||||||||||||||||||||||||
(file) => ({ | ||||||||||||||||||||||||||||||||||||||||||
id: file.id!, | ||||||||||||||||||||||||||||||||||||||||||
name: file.name!, | ||||||||||||||||||||||||||||||||||||||||||
mimeType: file.mimeType!, | ||||||||||||||||||||||||||||||||||||||||||
modifiedTime: file.modifiedTime!, | ||||||||||||||||||||||||||||||||||||||||||
size: file.size!, | ||||||||||||||||||||||||||||||||||||||||||
parents: file.parents, | ||||||||||||||||||||||||||||||||||||||||||
webViewLink: file.webViewLink, | ||||||||||||||||||||||||||||||||||||||||||
}), | ||||||||||||||||||||||||||||||||||||||||||
const lastSyncTime = await this.getLastSyncTime(connection.id_connection); | ||||||||||||||||||||||||||||||||||||||||||
console.log( | ||||||||||||||||||||||||||||||||||||||||||
'last updated time for google drive file is ' + | ||||||||||||||||||||||||||||||||||||||||||
JSON.stringify(lastSyncTime), | ||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||
let pageToken: string | undefined; | ||||||||||||||||||||||||||||||||||||||||||
let allFiles: GoogleDriveFileOutput[] = []; | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const query = lastSyncTime | ||||||||||||||||||||||||||||||||||||||||||
? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'` | ||||||||||||||||||||||||||||||||||||||||||
: 'trashed = false'; | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
do { | ||||||||||||||||||||||||||||||||||||||||||
const response = await drive.files.list({ | ||||||||||||||||||||||||||||||||||||||||||
q: query, | ||||||||||||||||||||||||||||||||||||||||||
fields: | ||||||||||||||||||||||||||||||||||||||||||
'nextPageToken, files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', | ||||||||||||||||||||||||||||||||||||||||||
pageSize: 1000, | ||||||||||||||||||||||||||||||||||||||||||
pageToken: pageToken, | ||||||||||||||||||||||||||||||||||||||||||
orderBy: 'modifiedTime', | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
const files: GoogleDriveFileOutput[] = response.data.files.map( | ||||||||||||||||||||||||||||||||||||||||||
(file) => ({ | ||||||||||||||||||||||||||||||||||||||||||
id: file.id!, | ||||||||||||||||||||||||||||||||||||||||||
name: file.name!, | ||||||||||||||||||||||||||||||||||||||||||
mimeType: file.mimeType!, | ||||||||||||||||||||||||||||||||||||||||||
modifiedTime: file.modifiedTime!, | ||||||||||||||||||||||||||||||||||||||||||
size: file.size!, | ||||||||||||||||||||||||||||||||||||||||||
parents: file.parents, | ||||||||||||||||||||||||||||||||||||||||||
webViewLink: file.webViewLink, | ||||||||||||||||||||||||||||||||||||||||||
}), | ||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||
allFiles = allFiles.concat(files); | ||||||||||||||||||||||||||||||||||||||||||
pageToken = response.data.nextPageToken; | ||||||||||||||||||||||||||||||||||||||||||
if (pageToken) { | ||||||||||||||||||||||||||||||||||||||||||
await sleep(100); // Wait 100ms between requests to avoid hitting rate limits | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} while (pageToken); | ||||||||||||||||||||||||||||||||||||||||||
this.logger.log(`Synced googledrive files !`); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
return { | ||||||||||||||||||||||||||||||||||||||||||
data: files, | ||||||||||||||||||||||||||||||||||||||||||
data: allFiles, | ||||||||||||||||||||||||||||||||||||||||||
message: 'Google Drive files retrieved', | ||||||||||||||||||||||||||||||||||||||||||
statusCode: 200, | ||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||
} catch (error) { | ||||||||||||||||||||||||||||||||||||||||||
throw error; | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
}*/ | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
private async getLastSyncTime(connectionId: string): Promise<Date | null> { | ||||||||||||||||||||||||||||||||||||||||||
const lastSync = await this.prisma.fs_files.findFirst({ | ||||||||||||||||||||||||||||||||||||||||||
where: { id_connection: connectionId }, | ||||||||||||||||||||||||||||||||||||||||||
orderBy: { modified_at: 'desc' }, | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
return lastSync ? lastSync.modified_at : null; | ||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+267
to
+272
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Optimize Retrieving the latest modified time without proper indexing can lead to performance issues on large datasets. Ensure that there's an index on Consider updating the database schema to include the necessary indexes. |
||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
async downloadFile(fileId: string, connection: any): Promise<Buffer> { | ||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,23 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { Process, Processor } from '@nestjs/bull'; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { Injectable } from '@nestjs/common'; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { Job } from 'bull'; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { Queues } from '@@core/@core-services/queues/types'; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { GoogleDriveService } from '.'; | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+1
to
+5
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM! Consider using named imports for clarity. The imports are appropriate for a NestJS application using Bull for queue processing. However, for better clarity and maintainability, consider using named imports for the Apply this change to improve import clarity: -import { Queues } from '@@core/@core-services/queues/types';
+import { Queues } from '@@core/@core-services/queues/types';
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
@Injectable() | ||||||||||||||||||||||||||||||||||||||||||||||||||
@Processor(Queues.THIRD_PARTY_DATA_INGESTION) | ||||||||||||||||||||||||||||||||||||||||||||||||||
export class GoogleDriveQueueProcessor { | ||||||||||||||||||||||||||||||||||||||||||||||||||
constructor(private readonly googleDriveService: GoogleDriveService) {} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
@Process('fs_file_googledrive') | ||||||||||||||||||||||||||||||||||||||||||||||||||
async handleGoogleDriveSync(job: Job) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
await this.googleDriveService.processBatch(job); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
console.error( | ||||||||||||||||||||||||||||||||||||||||||||||||||
`Failed to process Google Drive sync job: ${error.message}`, | ||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||
throw error; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+12
to
+22
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM! Consider enhancing error logging for better debugging. The
However, there's room for improvement in error logging: Consider enhancing the error logging to include more context: } catch (error) {
console.error(
- `Failed to process Google Drive sync job: ${error.message}`,
+ `Failed to process Google Drive sync job: ${error.message}`,
+ `Job ID: ${job.id}, Data: ${JSON.stringify(job.data)}`,
+ error.stack
);
throw error;
} This change will provide more context for debugging, including the job ID and data. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
LGTM! Consider adding a comment for the new queue.
The addition of the THIRD_PARTY_DATA_INGESTION queue is correct and aligns with the PR objective of scaling data fetching from Google Drive. The implementation is consistent with the existing code structure.
Consider adding a brief comment above this queue configuration to explain its purpose, similar to:
This would improve code readability and make it easier for other developers to understand the purpose of this queue.