From fc854e14fdf2456c331a99a0aeb2a80d12afafa9 Mon Sep 17 00:00:00 2001 From: nael Date: Tue, 8 Oct 2024 22:48:11 +0200 Subject: [PATCH] :bug: Fixed some lines --- .../unification/ingest-data.service.ts | 4 +- .../api/src/@core/utils/types/interface.ts | 2 +- .../src/filestorage/file/file.controller.ts | 16 ++++ .../api/src/filestorage/file/file.module.ts | 2 + .../filestorage/file/services/file.service.ts | 31 ++++--- .../file/services/googledrive/index.ts | 87 +++++++++++-------- 6 files changed, 95 insertions(+), 47 deletions(-) diff --git a/packages/api/src/@core/@core-services/unification/ingest-data.service.ts b/packages/api/src/@core/@core-services/unification/ingest-data.service.ts index 24cf8179d..353f5718c 100644 --- a/packages/api/src/@core/@core-services/unification/ingest-data.service.ts +++ b/packages/api/src/@core/@core-services/unification/ingest-data.service.ts @@ -211,7 +211,7 @@ export class IngestDataService { ); // insert the files in our s3 bucket so we can process them for our RAG - if (vertical === 'filestorage' && commonObject === 'file') { + /*if (vertical === 'filestorage' && commonObject === 'file') { try { const filesInfo: FileInfo[] = data .filter((file: FileStorageFile) => file.mime_type !== null) @@ -241,7 +241,7 @@ export class IngestDataService { // Optionally, you could create an event to log this error // await this.prisma.events.create({...}); } - } + }*/ const event = await this.prisma.events.create({ data: { diff --git a/packages/api/src/@core/utils/types/interface.ts b/packages/api/src/@core/utils/types/interface.ts index f04be63fc..e03406eae 100644 --- a/packages/api/src/@core/utils/types/interface.ts +++ b/packages/api/src/@core/utils/types/interface.ts @@ -77,7 +77,7 @@ export type SyncParam = { }; export interface IBaseObjectService { sync(data: SyncParam): Promise>; - ingestData( + ingestData?( sourceData: any[], connectionId: string, customFieldMappings?: { diff --git a/packages/api/src/filestorage/file/file.controller.ts b/packages/api/src/filestorage/file/file.controller.ts index a79fbb1dd..8b4320a89 100644 --- a/packages/api/src/filestorage/file/file.controller.ts +++ b/packages/api/src/filestorage/file/file.controller.ts @@ -46,6 +46,22 @@ export class FileController { this.logger.setContext(FileController.name); } + /*@Get('count') + async getNumberFiles( + @Headers('x-connection-token') connection_token: string, + @Query() query: QueryDto, + ) { + try { + const { linkedUserId, remoteSource, connectionId, projectId } = + await this.connectionUtils.getConnectionMetadataFromConnectionToken( + connection_token, + ); + return this.fileService.getCountFiles(connectionId); + } catch (error) { + throw new Error(error); + } + }*/ + @ApiOperation({ operationId: 'listFilestorageFile', summary: 'List Files', diff --git a/packages/api/src/filestorage/file/file.module.ts b/packages/api/src/filestorage/file/file.module.ts index b46829cee..278fbd810 100644 --- a/packages/api/src/filestorage/file/file.module.ts +++ b/packages/api/src/filestorage/file/file.module.ts @@ -16,6 +16,7 @@ import { ServiceRegistry } from './services/registry.service'; import { SharepointService } from './services/sharepoint'; import { SharepointFileMapper } from './services/sharepoint/mappers'; import { SyncService } from './sync/sync.service'; +import { GoogleDriveQueueProcessor } from './services/googledrive/processor'; @Module({ controllers: [FileController], @@ -39,6 +40,7 @@ import { SyncService } from './sync/sync.service'; DropboxService, DropboxFileMapper, GoogleDriveService, + GoogleDriveQueueProcessor, ], exports: [SyncService, ServiceRegistry], }) diff --git a/packages/api/src/filestorage/file/services/file.service.ts b/packages/api/src/filestorage/file/services/file.service.ts index f0f88a3d3..5102fde26 100644 --- a/packages/api/src/filestorage/file/services/file.service.ts +++ b/packages/api/src/filestorage/file/services/file.service.ts @@ -1,20 +1,19 @@ -import { Injectable } from '@nestjs/common'; -import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; import { LoggerService } from '@@core/@core-services/logger/logger.service'; -import { v4 as uuidv4 } from 'uuid'; -import { ApiResponse } from '@@core/utils/types'; +import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; +import { CoreUnification } from '@@core/@core-services/unification/core-unification.service'; +import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service'; +import { FieldMappingService } from '@@core/field-mapping/field-mapping.service'; +import { ApiResponse } from '@@core/utils/types'; +import { OriginalFileOutput } from '@@core/utils/types/original/original.file-storage'; +import { FileStorageObject } from '@filestorage/@lib/@types'; +import { Injectable } from '@nestjs/common'; +import { v4 as uuidv4 } from 'uuid'; import { UnifiedFilestorageFileInput, UnifiedFilestorageFileOutput, } from '../types/model.unified'; -import { FieldMappingService } from '@@core/field-mapping/field-mapping.service'; import { ServiceRegistry } from './registry.service'; -import { CoreSyncRegistry } from '@@core/@core-services/registries/core-sync.registry'; -import { FileStorageObject } from '@filestorage/@lib/@types'; -import { OriginalFileOutput } from '@@core/utils/types/original/original.file-storage'; -import { CoreUnification } from '@@core/@core-services/unification/core-unification.service'; -import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; @Injectable() export class FileService { @@ -441,4 +440,16 @@ export class FileService { throw error; } } + /*async getCountFiles(connection_id: string): Promise { + try { + const fileCount = await this.prisma.fs_files.count({ + where: { + id_connection: connection_id, + }, + }); + return fileCount; + } catch (error) { + throw error; + } + }*/ } diff --git a/packages/api/src/filestorage/file/services/googledrive/index.ts b/packages/api/src/filestorage/file/services/googledrive/index.ts index a5af0e204..8c19fa217 100644 --- a/packages/api/src/filestorage/file/services/googledrive/index.ts +++ b/packages/api/src/filestorage/file/services/googledrive/index.ts @@ -80,6 +80,7 @@ export class GoogleDriveService implements IFileService { : 'trashed = false'; let pageToken: string | undefined; + let count = 0; do { const response = await this.rateLimitedRequest(() => drive.files.list({ @@ -90,20 +91,22 @@ export class GoogleDriveService implements IFileService { }), ); + count++; + await this.bullQueueService .getThirdPartyDataIngestionQueue() .add('fs_file_googledrive', { ...data, - pageToken: response.data.nextPageToken, + pageToken: (response as any).data.nextPageToken, query, connectionId: connection.id_connection, custom_field_mappings, ingestParams, }); - pageToken = response.data.nextPageToken; + pageToken = (response as any).data.nextPageToken; } while (pageToken); - + console.log(`it has been called ${count} times`) return { data: [], message: 'Google Drive sync completed', @@ -135,41 +138,57 @@ export class GoogleDriveService implements IFileService { access_token: this.cryptoService.decrypt(connection.access_token), }); const drive = google.drive({ version: 'v3', auth }); - - const response = await this.rateLimitedRequest(() => - drive.files.list({ - q: query, - fields: - 'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', - pageSize: BATCH_SIZE, - pageToken: pageToken, - orderBy: 'modifiedTime', - }), - ); - - const files: GoogleDriveFileOutput[] = response.data.files.map((file) => ({ - id: file.id!, - name: file.name!, - mimeType: file.mimeType!, - modifiedTime: file.modifiedTime!, - size: file.size!, - parents: file.parents, - webViewLink: file.webViewLink, - })); - - await this.ingestData( - files, - connectionId, - custom_field_mappings, - ingestParams, - ); + try { + const response = await this.rateLimitedRequest(() => + drive.files.list({ + q: query, + fields: + 'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', + pageSize: BATCH_SIZE, + pageToken: pageToken, + orderBy: 'modifiedTime', + }), + ); + + const files: GoogleDriveFileOutput[] = (response as any).data.files.map((file) => ({ + id: file.id!, + name: file.name!, + mimeType: file.mimeType!, + modifiedTime: file.modifiedTime!, + size: file.size!, + parents: file.parents, + webViewLink: file.webViewLink, + })); + + await this.ingestData( + files, + connectionId, + custom_field_mappings, + ingestParams, + ); + } catch (error) { + this.logger.error('Error in processBatch:', error); + if (error.message === 'Invalid Value') { + this.logger.error('This may be due to an expired or invalid access token.', error); + } + throw error; + } } private async rateLimitedRequest(request: () => Promise): Promise { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { setTimeout(async () => { - const result = await request(); - resolve(result); + try { + const result = await request(); + resolve(result); + } catch (error) { + this.logger.error('Error in rateLimitedRequest:', error); + if (error.response) { + this.logger.error('Response data:', error.response.data); + this.logger.error('Response status:', error.response.status); + } + reject(error); + } }, 1000 / API_RATE_LIMIT); }); }