Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚑 Scaling fetching datta from gdrive #710

Merged
merged 3 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/api/src/@core/@core-services/queues/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import { Queues } from './types';
{
name: Queues.RAG_DOCUMENT_PROCESSING,
},
{
name: Queues.THIRD_PARTY_DATA_INGESTION,
},
Comment on lines +24 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a comment for the new queue.

The addition of the THIRD_PARTY_DATA_INGESTION queue is correct and aligns with the PR objective of scaling data fetching from Google Drive. The implementation is consistent with the existing code structure.

Consider adding a brief comment above this queue configuration to explain its purpose, similar to:

// Queue for handling third-party data ingestion tasks, such as fetching data from Google Drive
{
  name: Queues.THIRD_PARTY_DATA_INGESTION,
},

This would improve code readability and make it easier for other developers to understand the purpose of this queue.

),
],
providers: [BullQueueService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export class BullQueueService {
public readonly syncJobsQueue: Queue,
@InjectQueue(Queues.FAILED_PASSTHROUGH_REQUESTS_HANDLER)
public readonly failedPassthroughRequestsQueue: Queue,
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION)
public readonly thirdPartyDataIngestionQueue: Queue,
Comment on lines +17 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a comment for the new queue.

The new queue injection for thirdPartyDataIngestionQueue is well-implemented and follows the existing patterns in the class. It's correctly declared as public readonly and uses the Queues.THIRD_PARTY_DATA_INGESTION constant.

Consider adding a brief comment explaining the purpose of this new queue, similar to:

/**
 * Queue for handling third-party data ingestion tasks
 */
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION)
public readonly thirdPartyDataIngestionQueue: Queue,

This would improve code documentation and make it easier for other developers to understand the queue's purpose at a glance.

@InjectQueue(Queues.RAG_DOCUMENT_PROCESSING)
private ragDocumentQueue: Queue,
) {}
Expand All @@ -35,6 +37,9 @@ export class BullQueueService {
getRagDocumentQueue() {
return this.ragDocumentQueue;
}
getThirdPartyDataIngestionQueue() {
return this.thirdPartyDataIngestionQueue;
}

async removeRepeatableJob(jobName: string) {
const jobs = await this.syncJobsQueue.getRepeatableJobs();
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/@core/@core-services/queues/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export enum Queues {
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties
FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff
RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING',
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a descriptive comment.

The addition of THIRD_PARTY_DATA_INGESTION to the Queues enum is appropriate and aligns with the PR's objective of scaling data fetching from Google Drive. The naming convention is consistent with other entries.

Consider adding a descriptive comment for this new queue type, similar to the other entries. For example:

-  THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
+  THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', // Queue for ingesting data from third-party sources like Google Drive
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', // Queue for ingesting data from third-party sources like Google Drive

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
private ragService: RagService,
) {}

async syncForLinkedUser<T, U, V extends IBaseObjectService>(

Check warning on line 34 in packages/api/src/@core/@core-services/unification/ingest-data.service.ts

View workflow job for this annotation

GitHub Actions / Build and Test (18.x)

'T' is defined but never used
integrationId: string,
linkedUserId: string,
vertical: string,
Expand Down Expand Up @@ -85,10 +85,16 @@
.filter((p) => p.shouldPassToService)
.map((p) => p.param);

const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});
Comment on lines +88 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider optimizing the ingestParams creation for better performance.

The current implementation using reduce with spread operator on the accumulator can lead to performance issues for large arrays, potentially causing O(n^2) time complexity.

Consider refactoring to use Object.fromEntries for better performance:

const ingestParams = Object.fromEntries(
  params
    .filter((p) => p.shouldPassToIngest)
    .map((p) => [p.paramName, p.param])
);

This approach avoids the spread operator in the accumulator, resulting in O(n) time complexity.

🧰 Tools
🪛 Biome

[error] 90-90: Avoid the use of spread (...) syntax on accumulators.

Spread syntax should be avoided on accumulators (like those in .reduce) because it causes a time complexity of O(n^2).
Consider methods such as .splice or .push instead.

(lint/performance/noAccumulatingSpread)


// Construct the syncParam object dynamically
const syncParam: SyncParam = {
linkedUserId,
custom_properties: remoteProperties,
custom_field_mappings: customFieldMappings,
ingestParams: ingestParams,
};

serviceParams.forEach((param, index) => {
Expand Down Expand Up @@ -124,11 +130,7 @@
return;
}

const sourceObject: U[] = resp.data;

const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});
/*const sourceObject: U[] = resp.data;

await this.ingestData<T, U>(
sourceObject,
Expand All @@ -138,7 +140,7 @@
commonObject,
customFieldMappings,
ingestParams,
);
);*/
} catch (syncError) {
this.logger.error(
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`,
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/@core/utils/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
Comment on lines +1 to +3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a brief JSDoc comment.

The sleep function is well-implemented and follows good practices. It provides a useful utility for introducing delays in asynchronous operations, which could be beneficial for rate limiting or managing API requests when fetching data from Google Drive.

Consider adding a brief JSDoc comment to improve documentation:

+/**
+ * Pauses execution for the specified number of milliseconds.
+ * @param ms The number of milliseconds to sleep.
+ * @returns A Promise that resolves after the specified delay.
+ */
export function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Pauses execution for the specified number of milliseconds.
* @param ms The number of milliseconds to sleep.
* @returns A Promise that resolves after the specified delay.
*/
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

14 changes: 14 additions & 0 deletions packages/api/src/@core/utils/types/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,22 @@ export interface IBaseSync {

export type SyncParam = {
linkedUserId: string;
custom_field_mappings?: {
slug: string;
remote_id: string;
}[];
ingestParams: { [key: string]: any };
Comment on lines +71 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM with a minor suggestion

The additions to the SyncParam type enhance its flexibility for data integration and ingestion scenarios. This aligns well with the PR's objective of scaling data fetching from Google Drive.

Consider making ingestParams optional to accommodate cases where no additional parameters are needed:

ingestParams?: { [key: string]: any };

This change would maintain flexibility while not requiring empty objects to be passed when no extra parameters are necessary.

[key: string]: any;
};
export interface IBaseObjectService {
sync(data: SyncParam): Promise<ApiResponse<any>>;
ingestData(
sourceData: any[],
connectionId: string,
customFieldMappings?: {
slug: string;
remote_id: string;
}[],
extraParams?: { [key: string]: any },
): Promise<any[]>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Approved with suggestions for improvement

The addition of the ingestData method to the IBaseObjectService interface is a good step towards scaling data fetching capabilities. It provides a flexible way to ingest data with custom mappings and extra parameters.

Consider the following improvements:

  1. Enhance type safety by specifying a more concrete return type instead of any[]. For example:
ingestData<T = any>(
  sourceData: T[],
  connectionId: string,
  customFieldMappings?: { slug: string; remote_id: string }[],
  extraParams?: { [key: string]: any }
): Promise<T[]>;
  1. For consistency, consider renaming customFieldMappings to custom_field_mappings to match the naming in the SyncParam type.

  2. If possible, define a type for the extraParams object to provide better documentation and type checking for the expected additional parameters.

These changes would improve type safety and maintain consistency across the codebase.

}
217 changes: 197 additions & 20 deletions packages/api/src/filestorage/file/services/googledrive/index.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,180 @@
import { EncryptionService } from '@@core/@core-services/encryption/encryption.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { ApiResponse } from '@@core/utils/types';
import { BullQueueService } from '@@core/@core-services/queues/shared.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';
import { SyncParam } from '@@core/utils/types/interface';
import { FileStorageObject } from '@filestorage/@lib/@types';
import { IFileService } from '@filestorage/file/types';
import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified';
import { Injectable } from '@nestjs/common';
import axios from 'axios';
import { OAuth2Client } from 'google-auth-library';
import { google } from 'googleapis';
import { ServiceRegistry } from '../registry.service';
import { GoogleDriveFileOutput } from './types';

const BATCH_SIZE = 1000; // Number of files to process in each batch
const API_RATE_LIMIT = 10; // Requests per second

@Injectable()
export class GoogleDriveService implements IFileService {
constructor(
private prisma: PrismaService,
private logger: LoggerService,
private cryptoService: EncryptionService,
private registry: ServiceRegistry,
private ingestService: IngestDataService,
private bullQueueService: BullQueueService,
) {
this.logger.setContext(
FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use template literals instead of string concatenation

The current string concatenation can be replaced with a template literal for improved readability and consistency.

Apply this diff to update the code:

- this.logger.setContext(
-   FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name,
- );
+ this.logger.setContext(
+   `${FileStorageObject.file.toUpperCase()}:${GoogleDriveService.name}`,
+ );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name,
`${FileStorageObject.file.toUpperCase()}:${GoogleDriveService.name}`,
🧰 Tools
🪛 Biome

[error] 31-31: Template literals are preferred over string concatenation.

Unsafe fix: Use a template literal.

(lint/style/useTemplate)

);
this.registry.registerService('googledrive', this);
}

async sync(data: SyncParam): Promise<ApiResponse<GoogleDriveFileOutput[]>> {
async ingestData(
sourceData: GoogleDriveFileOutput[],
connectionId: string,
customFieldMappings?: {
slug: string;
remote_id: string;
}[],
extraParams?: { [key: string]: any },
): Promise<UnifiedFilestorageFileOutput[]> {
return this.ingestService.ingestData<
UnifiedFilestorageFileOutput,
GoogleDriveFileOutput
>(
sourceData,
'googledrive',
connectionId,
'filestorage',
'file',
customFieldMappings,
extraParams,
);
}

async sync(data: SyncParam) {
const { linkedUserId, custom_field_mappings, ingestParams } = data;
const connection = await this.prisma.connections.findFirst({
where: {
id_linked_user: linkedUserId,
provider_slug: 'googledrive',
vertical: 'filestorage',
},
});

if (!connection) return;

const auth = new OAuth2Client();
auth.setCredentials({
access_token: this.cryptoService.decrypt(connection.access_token),
});
const drive = google.drive({ version: 'v3', auth });

const lastSyncTime = await this.getLastSyncTime(connection.id_connection);
const query = lastSyncTime
? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'`
: 'trashed = false';

let pageToken: string | undefined;
do {
const response = await this.rateLimitedRequest(() =>
drive.files.list({
q: query,
fields: 'nextPageToken',
pageSize: BATCH_SIZE,
pageToken: pageToken,
}),
);

await this.bullQueueService
.getThirdPartyDataIngestionQueue()
.add('fs_file_googledrive', {
...data,
pageToken: response.data.nextPageToken,
query,
connectionId: connection.id_connection,
custom_field_mappings,
ingestParams,
});

pageToken = response.data.nextPageToken;
} while (pageToken);

return {
data: [],
message: 'Google Drive sync completed',
statusCode: 200,
};
}

async processBatch(job: any) {
const {
linkedUserId,
query,
pageToken,
connectionId,
custom_field_mappings,
ingestParams,
} = job.data;
Comment on lines +117 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Define a specific type for the job parameter in processBatch

Using any for the job parameter reduces type safety and may introduce runtime errors. Defining a specific type enhances maintainability and readability.

Consider defining an interface for the job data:

interface GoogleDriveJobData {
  linkedUserId: string;
  query: string;
  pageToken?: string;
  connectionId: string;
  custom_field_mappings?: { slug: string; remote_id: string }[];
  ingestParams?: { [key: string]: any };
}

Then update the method signature:

- async processBatch(job: any) {
+ async processBatch(job: { data: GoogleDriveJobData }) {

const connection = await this.prisma.connections.findFirst({
where: {
id_linked_user: linkedUserId,
provider_slug: 'googledrive',
vertical: 'filestorage',
},
});

if (!connection) return;

const auth = new OAuth2Client();
auth.setCredentials({
access_token: this.cryptoService.decrypt(connection.access_token),
});
const drive = google.drive({ version: 'v3', auth });

const response = await this.rateLimitedRequest(() =>
drive.files.list({
q: query,
fields:
'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)',
pageSize: BATCH_SIZE,
pageToken: pageToken,
orderBy: 'modifiedTime',
}),
);

const files: GoogleDriveFileOutput[] = response.data.files.map((file) => ({
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
modifiedTime: file.modifiedTime!,
size: file.size!,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using non-null assertions

Using the non-null assertion operator ! can lead to runtime errors if any of the properties are null or undefined. It's safer to handle possible undefined values appropriately.

Consider handling the potential undefined values or ensuring that these properties are always defined before usage. You can use optional chaining or provide default values.

Apply this diff to remove the non-null assertions and handle potential undefined values:

- id: file.id!,
+ id: file.id ?? '',
- name: file.name!,
+ name: file.name ?? '',
- mimeType: file.mimeType!,
+ mimeType: file.mimeType ?? '',
- modifiedTime: file.modifiedTime!,
+ modifiedTime: file.modifiedTime ?? '',
- size: file.size!,
+ size: file.size ?? '0',
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
modifiedTime: file.modifiedTime!,
size: file.size!,
id: file.id ?? '',
name: file.name ?? '',
mimeType: file.mimeType ?? '',
modifiedTime: file.modifiedTime ?? '',
size: file.size ?? '0',
🧰 Tools
🪛 Biome

[error] 151-151: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 152-152: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 153-153: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 154-154: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 155-155: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)

parents: file.parents,
webViewLink: file.webViewLink,
}));

await this.ingestData(
files,
connectionId,
custom_field_mappings,
ingestParams,
);
}

private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve) => {
setTimeout(async () => {
const result = await request();
resolve(result);
}, 1000 / API_RATE_LIMIT);
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle errors in rateLimitedRequest to prevent unhandled rejections

If the request() throws an error, the promise may neither resolve nor reject, leading to unhandled promise rejections. It's important to handle exceptions within the promise.

Wrap the request() call in a try-catch block and reject the promise on error.

Apply this diff to handle errors appropriately:

 private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
   return new Promise((resolve, reject) => {
     setTimeout(async () => {
+      try {
         const result = await request();
         resolve(result);
+      } catch (error) {
+        reject(error);
+      }
     }, 1000 / API_RATE_LIMIT);
   });
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve) => {
setTimeout(async () => {
const result = await request();
resolve(result);
}, 1000 / API_RATE_LIMIT);
});
}
private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(async () => {
try {
const result = await request();
resolve(result);
} catch (error) {
reject(error);
}
}, 1000 / API_RATE_LIMIT);
});
}


/*async sync(data: SyncParam): Promise<ApiResponse<GoogleDriveFileOutput[]>> {
try {
const { linkedUserId, id_folder } = data;

Expand All @@ -46,34 +194,63 @@ export class GoogleDriveService implements IFileService {
});
const drive = google.drive({ version: 'v3', auth });

const response = await drive.files.list({
q: 'trashed = false',
fields:
'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)',
pageSize: 1000, // Adjust as needed
});

const files: GoogleDriveFileOutput[] = response.data.files.map(
(file) => ({
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
modifiedTime: file.modifiedTime!,
size: file.size!,
parents: file.parents,
webViewLink: file.webViewLink,
}),
const lastSyncTime = await this.getLastSyncTime(connection.id_connection);
console.log(
'last updated time for google drive file is ' +
JSON.stringify(lastSyncTime),
);
let pageToken: string | undefined;
let allFiles: GoogleDriveFileOutput[] = [];

const query = lastSyncTime
? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'`
: 'trashed = false';

do {
const response = await drive.files.list({
q: query,
fields:
'nextPageToken, files(id, name, mimeType, modifiedTime, size, parents, webViewLink)',
pageSize: 1000,
pageToken: pageToken,
orderBy: 'modifiedTime',
});

const files: GoogleDriveFileOutput[] = response.data.files.map(
(file) => ({
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
modifiedTime: file.modifiedTime!,
size: file.size!,
parents: file.parents,
webViewLink: file.webViewLink,
}),
);
allFiles = allFiles.concat(files);
pageToken = response.data.nextPageToken;
if (pageToken) {
await sleep(100); // Wait 100ms between requests to avoid hitting rate limits
}
} while (pageToken);
this.logger.log(`Synced googledrive files !`);

return {
data: files,
data: allFiles,
message: 'Google Drive files retrieved',
statusCode: 200,
};
} catch (error) {
throw error;
}
}*/

private async getLastSyncTime(connectionId: string): Promise<Date | null> {
const lastSync = await this.prisma.fs_files.findFirst({
where: { id_connection: connectionId },
orderBy: { modified_at: 'desc' },
});
return lastSync ? lastSync.modified_at : null;
Comment on lines +267 to +272
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize getLastSyncTime query performance

Retrieving the latest modified time without proper indexing can lead to performance issues on large datasets.

Ensure that there's an index on id_connection and modified_at fields in the fs_files table to optimize the query.

Consider updating the database schema to include the necessary indexes.

}

async downloadFile(fileId: string, connection: any): Promise<Buffer> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Process, Processor } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Job } from 'bull';
import { Queues } from '@@core/@core-services/queues/types';
import { GoogleDriveService } from '.';
Comment on lines +1 to +5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider using named imports for clarity.

The imports are appropriate for a NestJS application using Bull for queue processing. However, for better clarity and maintainability, consider using named imports for the Queues type.

Apply this change to improve import clarity:

-import { Queues } from '@@core/@core-services/queues/types';
+import { Queues } from '@@core/@core-services/queues/types';

Committable suggestion was skipped due to low confidence.


@Injectable()
@Processor(Queues.THIRD_PARTY_DATA_INGESTION)
export class GoogleDriveQueueProcessor {
constructor(private readonly googleDriveService: GoogleDriveService) {}

@Process('fs_file_googledrive')
async handleGoogleDriveSync(job: Job) {
try {
await this.googleDriveService.processBatch(job);
} catch (error) {
console.error(
`Failed to process Google Drive sync job: ${error.message}`,
);
throw error;
}
}
Comment on lines +12 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider enhancing error logging for better debugging.

The handleGoogleDriveSync method is well-implemented:

  • The @Process('fs_file_googledrive') decorator correctly specifies the job type.
  • The method is properly declared as async and takes a Job parameter.
  • Error handling is implemented with a try-catch block.

However, there's room for improvement in error logging:

Consider enhancing the error logging to include more context:

 } catch (error) {
   console.error(
-    `Failed to process Google Drive sync job: ${error.message}`,
+    `Failed to process Google Drive sync job: ${error.message}`,
+    `Job ID: ${job.id}, Data: ${JSON.stringify(job.data)}`,
+    error.stack
   );
   throw error;
 }

This change will provide more context for debugging, including the job ID and data.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Process('fs_file_googledrive')
async handleGoogleDriveSync(job: Job) {
try {
await this.googleDriveService.processBatch(job);
} catch (error) {
console.error(
`Failed to process Google Drive sync job: ${error.message}`,
);
throw error;
}
}
@Process('fs_file_googledrive')
async handleGoogleDriveSync(job: Job) {
try {
await this.googleDriveService.processBatch(job);
} catch (error) {
console.error(
`Failed to process Google Drive sync job: ${error.message}`,
`Job ID: ${job.id}, Data: ${JSON.stringify(job.data)}`,
error.stack
);
throw error;
}
}

}
2 changes: 1 addition & 1 deletion packages/api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async function bootstrap() {
extendedSpecs['x-speakeasy-name-override'];
addSpeakeasyGroup(document);

await generatePanoraParamsSpec(document);
// TODO: await generatePanoraParamsSpec(document);

useContainer(app.select(AppModule), { fallbackOnErrors: true });

Expand Down
Loading