-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🚑 Scaling fetching datta from gdrive #710
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,8 @@ export class BullQueueService { | |
public readonly syncJobsQueue: Queue, | ||
@InjectQueue(Queues.FAILED_PASSTHROUGH_REQUESTS_HANDLER) | ||
public readonly failedPassthroughRequestsQueue: Queue, | ||
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION) | ||
public readonly thirdPartyDataIngestionQueue: Queue, | ||
Comment on lines
+17
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM! Consider adding a comment for the new queue. The new queue injection for Consider adding a brief comment explaining the purpose of this new queue, similar to: /**
* Queue for handling third-party data ingestion tasks
*/
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION)
public readonly thirdPartyDataIngestionQueue: Queue, This would improve code documentation and make it easier for other developers to understand the queue's purpose at a glance. |
||
@InjectQueue(Queues.RAG_DOCUMENT_PROCESSING) | ||
private ragDocumentQueue: Queue, | ||
) {} | ||
|
@@ -35,6 +37,9 @@ export class BullQueueService { | |
getRagDocumentQueue() { | ||
return this.ragDocumentQueue; | ||
} | ||
getThirdPartyDataIngestionQueue() { | ||
return this.thirdPartyDataIngestionQueue; | ||
} | ||
|
||
async removeRepeatableJob(jobName: string) { | ||
const jobs = await this.syncJobsQueue.getRepeatableJobs(); | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,4 +4,5 @@ export enum Queues { | |||||
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties | ||||||
FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff | ||||||
RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING', | ||||||
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM! Consider adding a descriptive comment. The addition of Consider adding a descriptive comment for this new queue type, similar to the other entries. For example: - THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
+ THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', // Queue for ingesting data from third-party sources like Google Drive 📝 Committable suggestion
Suggested change
|
||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
import { PrismaService } from '../prisma/prisma.service'; | ||
import { | ||
ApiResponse, | ||
getFileExtensionFromMimeType, | ||
TargetObject, | ||
} from '@@core/utils/types'; | ||
import { UnifySourceType } from '@@core/utils/types/unify.output'; | ||
|
@@ -15,8 +15,8 @@ | |
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service'; | ||
import { LoggerService } from '../logger/logger.service'; | ||
import { RagService } from '@@core/rag/rag.service'; | ||
import { FileInfo } from '@@core/rag/types'; | ||
import { fs_files as FileStorageFile } from '@prisma/client'; | ||
|
||
@Injectable() | ||
export class IngestDataService { | ||
|
@@ -31,7 +31,7 @@ | |
private ragService: RagService, | ||
) {} | ||
|
||
async syncForLinkedUser<T, U, V extends IBaseObjectService>( | ||
integrationId: string, | ||
linkedUserId: string, | ||
vertical: string, | ||
|
@@ -85,10 +85,16 @@ | |
.filter((p) => p.shouldPassToService) | ||
.map((p) => p.param); | ||
|
||
const ingestParams = params | ||
.filter((p) => p.shouldPassToIngest) | ||
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); | ||
Comment on lines
+88
to
+90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider optimizing the The current implementation using Consider refactoring to use const ingestParams = Object.fromEntries(
params
.filter((p) => p.shouldPassToIngest)
.map((p) => [p.paramName, p.param])
); This approach avoids the spread operator in the accumulator, resulting in O(n) time complexity. 🧰 Tools🪛 Biome
|
||
|
||
// Construct the syncParam object dynamically | ||
const syncParam: SyncParam = { | ||
linkedUserId, | ||
custom_properties: remoteProperties, | ||
custom_field_mappings: customFieldMappings, | ||
ingestParams: ingestParams, | ||
}; | ||
|
||
serviceParams.forEach((param, index) => { | ||
|
@@ -124,11 +130,7 @@ | |
return; | ||
} | ||
|
||
const sourceObject: U[] = resp.data; | ||
|
||
const ingestParams = params | ||
.filter((p) => p.shouldPassToIngest) | ||
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); | ||
/*const sourceObject: U[] = resp.data; | ||
|
||
await this.ingestData<T, U>( | ||
sourceObject, | ||
|
@@ -138,7 +140,7 @@ | |
commonObject, | ||
customFieldMappings, | ||
ingestParams, | ||
); | ||
);*/ | ||
} catch (syncError) { | ||
this.logger.error( | ||
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`, | ||
|
@@ -209,7 +211,7 @@ | |
); | ||
|
||
// insert the files in our s3 bucket so we can process them for our RAG | ||
if (vertical === 'filestorage' && commonObject === 'file') { | ||
/*if (vertical === 'filestorage' && commonObject === 'file') { | ||
try { | ||
const filesInfo: FileInfo[] = data | ||
.filter((file: FileStorageFile) => file.mime_type !== null) | ||
|
@@ -239,7 +241,7 @@ | |
// Optionally, you could create an event to log this error | ||
// await this.prisma.events.create({...}); | ||
} | ||
} | ||
}*/ | ||
|
||
const event = await this.prisma.events.create({ | ||
data: { | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,3 @@ | ||||||||||||||||||||||||
export function sleep(ms: number): Promise<void> { | ||||||||||||||||||||||||
return new Promise((resolve) => setTimeout(resolve, ms)); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
Comment on lines
+1
to
+3
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM! Consider adding a brief JSDoc comment. The Consider adding a brief JSDoc comment to improve documentation: +/**
+ * Pauses execution for the specified number of milliseconds.
+ * @param ms The number of milliseconds to sleep.
+ * @returns A Promise that resolves after the specified delay.
+ */
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
} 📝 Committable suggestion
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,8 +68,22 @@ export interface IBaseSync { | |
|
||
export type SyncParam = { | ||
linkedUserId: string; | ||
custom_field_mappings?: { | ||
slug: string; | ||
remote_id: string; | ||
}[]; | ||
ingestParams: { [key: string]: any }; | ||
Comment on lines
+71
to
+75
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) LGTM with a minor suggestion The additions to the Consider making ingestParams?: { [key: string]: any }; This change would maintain flexibility while not requiring empty objects to be passed when no extra parameters are necessary. |
||
[key: string]: any; | ||
}; | ||
export interface IBaseObjectService { | ||
sync(data: SyncParam): Promise<ApiResponse<any>>; | ||
ingestData?( | ||
sourceData: any[], | ||
connectionId: string, | ||
customFieldMappings?: { | ||
slug: string; | ||
remote_id: string; | ||
}[], | ||
extraParams?: { [key: string]: any }, | ||
): Promise<any[]>; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
LGTM! Consider adding a comment for the new queue.
The addition of the THIRD_PARTY_DATA_INGESTION queue is correct and aligns with the PR objective of scaling data fetching from Google Drive. The implementation is consistent with the existing code structure.
Consider adding a brief comment above this queue configuration to explain its purpose, similar to:
This would improve code readability and make it easier for other developers to understand the purpose of this queue.