diff --git a/packages/api/src/@core/@core-services/unification/ingest-data.service.ts b/packages/api/src/@core/@core-services/unification/ingest-data.service.ts index 0d062095b..ea52cc940 100644 --- a/packages/api/src/@core/@core-services/unification/ingest-data.service.ts +++ b/packages/api/src/@core/@core-services/unification/ingest-data.service.ts @@ -97,40 +97,71 @@ export class IngestDataService { }); let resp: ApiResponse; - if (wh_real_time_trigger && wh_real_time_trigger.data.remote_id) { - switch (wh_real_time_trigger.action) { - case 'DELETE': - await this.syncRegistry - .getService(vertical, commonObject) - .removeInDb( - connection.id_connection, - wh_real_time_trigger.data.remote_id, - ); - default: - syncParam.webhook_remote_identifier = - wh_real_time_trigger.data.remote_id; - resp = await service.sync(syncParam); - break; + try { + if (wh_real_time_trigger && wh_real_time_trigger.data.remote_id) { + switch (wh_real_time_trigger.action) { + case 'DELETE': + await this.syncRegistry + .getService(vertical, commonObject) + .removeInDb( + connection.id_connection, + wh_real_time_trigger.data.remote_id, + ); + default: + syncParam.webhook_remote_identifier = + wh_real_time_trigger.data.remote_id; + resp = await service.sync(syncParam); + break; + } + } else { + resp = await service.sync(syncParam); } - } else { - resp = await service.sync(syncParam); - } - const sourceObject: U[] = resp.data; + if (!resp || !resp.data) { + this.logger.warn( + `Sync operation for ${integrationId} ${commonObject} returned no data`, + ); + return; + } - const ingestParams = params - .filter((p) => p.shouldPassToIngest) - .reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); + const sourceObject: U[] = resp.data; - await this.ingestData( - sourceObject, - integrationId, - connection.id_connection, - vertical, - commonObject, - customFieldMappings, - ingestParams, - ); + const ingestParams = params + .filter((p) => p.shouldPassToIngest) + .reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); + + await this.ingestData( + sourceObject, + integrationId, + connection.id_connection, + vertical, + commonObject, + customFieldMappings, + ingestParams, + ); + } catch (syncError) { + this.logger.error( + `Error syncing ${integrationId} ${commonObject}: ${syncError.message}`, + syncError, + ); + // Optionally, you could create an event to log this error + /*await this.prisma.events.create({ + data: { + id_connection: connection.id_connection, + id_project: connection.id_project, + id_event: uuidv4(), + status: 'error', + type: `${vertical}.${commonObject}.sync_failed`, + method: 'SYNC', + url: '/sync', + provider: integrationId, + direction: '0', + timestamp: new Date(), + id_linked_user: linkedUserId, + error: syncError.message, + }, + });*/ + } } catch (error) { throw error; } diff --git a/packages/api/src/@core/sync/sync.processor.ts b/packages/api/src/@core/sync/sync.processor.ts index 5e8681a11..2fd906e61 100644 --- a/packages/api/src/@core/sync/sync.processor.ts +++ b/packages/api/src/@core/sync/sync.processor.ts @@ -35,7 +35,7 @@ export class SyncProcessor { `Error processing job ${job.id} for ${vertical} ${commonObject} (Project: ${projectId})`, error.stack, ); - throw error; // Re-throw the error to mark the job as failed + return { status: 'failed', error: error.message }; } } diff --git a/packages/api/src/@core/sync/sync.service.ts b/packages/api/src/@core/sync/sync.service.ts index ef3935d60..d0e9c2acb 100644 --- a/packages/api/src/@core/sync/sync.service.ts +++ b/packages/api/src/@core/sync/sync.service.ts @@ -22,115 +22,146 @@ export class CoreSyncService { @Cron(CronExpression.EVERY_30_SECONDS) async checkAndKickstartSync(user_id?: string) { - const users = user_id - ? [ - await this.prisma.users.findUnique({ - where: { - id_user: user_id, - }, - }), - ] - : await this.prisma.users.findMany(); - if (users && users.length > 0) { - for (const user of users) { - const projects = await this.prisma.projects.findMany({ - where: { - id_user: user.id_user, - }, - }); - for (const project of projects) { - const projectSyncConfig = - await this.prisma.projects_pull_frequency.findFirst({ + try { + const users = user_id + ? [ + await this.prisma.users.findUnique({ + where: { + id_user: user_id, + }, + }), + ] + : await this.prisma.users.findMany(); + if (users && users.length > 0) { + for (const user of users) { + try { + const projects = await this.prisma.projects.findMany({ where: { - id_project: project.id_project, + id_user: user.id_user, }, }); - - if (projectSyncConfig) { - const syncIntervals = { - crm: projectSyncConfig.crm, - ats: projectSyncConfig.ats, - hris: projectSyncConfig.hris, - accounting: projectSyncConfig.accounting, - filestorage: projectSyncConfig.filestorage, - ecommerce: projectSyncConfig.ecommerce, - ticketing: projectSyncConfig.ticketing, - }; - - for (const [vertical, interval] of Object.entries(syncIntervals)) { - const now = new Date(); - const lastSyncEvent = await this.prisma.events.findFirst({ - where: { - id_project: project.id_project, - type: `${vertical}.batchSyncStart`, - }, - orderBy: { - timestamp: 'desc', - }, - }); - - const lastSyncTime = lastSyncEvent - ? lastSyncEvent.timestamp - : new Date(0); - - const secondsSinceLastSync = - Number(now.getTime() - lastSyncTime.getTime()) / 1000; - - if (interval && secondsSinceLastSync >= interval) { - await this.prisma.events.create({ - data: { - id_project: project.id_project, - id_event: uuidv4(), - status: 'success', - type: `${vertical}.batchSyncStart`, - method: 'GET', - url: '', - provider: '', - direction: '0', - timestamp: new Date(), - }, - }); - const commonObjects = getCommonObjectsForVertical(vertical); - for (const commonObject of commonObjects) { - const service = this.registry.getService( - vertical, - commonObject, - ); - if (service) { - try { - const cronExpression = this.convertIntervalToCron( - Number(interval), - ); - - await this.bullQueueService.queueSyncJob( - `${vertical}-sync-${commonObject}s`, - { - projectId: project.id_project, - vertical, - commonObject, + for (const project of projects) { + try { + const projectSyncConfig = + await this.prisma.projects_pull_frequency.findFirst({ + where: { + id_project: project.id_project, + }, + }); + + if (projectSyncConfig) { + const syncIntervals = { + crm: projectSyncConfig.crm, + ats: projectSyncConfig.ats, + hris: projectSyncConfig.hris, + accounting: projectSyncConfig.accounting, + filestorage: projectSyncConfig.filestorage, + ecommerce: projectSyncConfig.ecommerce, + ticketing: projectSyncConfig.ticketing, + }; + + for (const [vertical, interval] of Object.entries( + syncIntervals, + )) { + const now = new Date(); + const lastSyncEvent = await this.prisma.events.findFirst({ + where: { + id_project: project.id_project, + type: `${vertical}.batchSyncStart`, + }, + orderBy: { + timestamp: 'desc', + }, + }); + + const lastSyncTime = lastSyncEvent + ? lastSyncEvent.timestamp + : new Date(0); + + const secondsSinceLastSync = + Number(now.getTime() - lastSyncTime.getTime()) / 1000; + + if (interval && secondsSinceLastSync >= interval) { + await this.prisma.events.create({ + data: { + id_project: project.id_project, + id_event: uuidv4(), + status: 'success', + type: `${vertical}.batchSyncStart`, + method: 'GET', + url: '', + provider: '', + direction: '0', + timestamp: new Date(), }, - cronExpression, - ); - this.logger.log( - `Synced ${vertical}.${commonObject} for project ${project.id_project}`, - ); - } catch (error) { - this.logger.error( - `Error syncing ${vertical}.${commonObject} for project ${project.id_project}: ${error.message}`, - error, - ); + }); + const commonObjects = + getCommonObjectsForVertical(vertical); + for (const commonObject of commonObjects) { + try { + const service = this.registry.getService( + vertical, + commonObject, + ); + if (service) { + try { + const cronExpression = this.convertIntervalToCron( + Number(interval), + ); + + await this.bullQueueService.queueSyncJob( + `${vertical}-sync-${commonObject}s`, + { + projectId: project.id_project, + vertical, + commonObject, + }, + cronExpression, + ); + this.logger.log( + `Synced ${vertical}.${commonObject} for project ${project.id_project}`, + ); + } catch (error) { + this.logger.error( + `Error syncing ${vertical}.${commonObject} for project ${project.id_project}: ${error.message}`, + error, + ); + } + } else { + this.logger.warn( + `No service found for ${vertical}.${commonObject}`, + ); + } + } catch (error) { + this.logger.error( + `Error processing ${vertical}.${commonObject} for project ${project.id_project}: ${error.message}`, + error, + ); + } + } } - } else { - this.logger.warn( - `No service found for ${vertical}.${commonObject}`, - ); } } + } catch (projectError) { + this.logger.error( + `Error processing project: ${projectError.message}`, + projectError, + ); } } + } catch (userError) { + this.logger.error( + `Error processing user: ${userError.message}`, + userError, + ); } } } + } catch (error) { + this.logger.error( + `Error in checkAndKickstartSync: ${error.message}`, + error, + ); } } @@ -182,8 +213,11 @@ export class CoreSyncService { case ConnectorCategory.Ecommerce: await this.handleEcommerceSync(provider, linkedUserId); break; + default: + this.logger.warn(`Unsupported vertical: ${vertical}`); } } catch (error) { + this.logger.error(`Error in initialSync: ${error.message}`, error); throw error; } } diff --git a/packages/api/src/ats/attachment/sync/sync.service.ts b/packages/api/src/ats/attachment/sync/sync.service.ts index 078932b4a..ec0f47d7a 100644 --- a/packages/api/src/ats/attachment/sync/sync.service.ts +++ b/packages/api/src/ats/attachment/sync/sync.service.ts @@ -1,23 +1,17 @@ -import { Injectable, OnModuleInit } from '@nestjs/common'; import { LoggerService } from '@@core/@core-services/logger/logger.service'; import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; -import { Cron } from '@nestjs/schedule'; -import { v4 as uuidv4 } from 'uuid'; +import { BullQueueService } from '@@core/@core-services/queues/shared.service'; +import { CoreSyncRegistry } from '@@core/@core-services/registries/core-sync.registry'; +import { CoreUnification } from '@@core/@core-services/unification/core-unification.service'; +import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; +import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service'; import { FieldMappingService } from '@@core/field-mapping/field-mapping.service'; +import { IBaseSync } from '@@core/utils/types/interface'; +import { Injectable, OnModuleInit } from '@nestjs/common'; +import { ats_candidate_attachments as AtsAttachment } from '@prisma/client'; +import { v4 as uuidv4 } from 'uuid'; import { ServiceRegistry } from '../services/registry.service'; -import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service'; -import { CoreSyncRegistry } from '@@core/@core-services/registries/core-sync.registry'; -import { ApiResponse } from '@@core/utils/types'; -import { IAttachmentService } from '../types'; -import { OriginalAttachmentOutput } from '@@core/utils/types/original/original.ats'; import { UnifiedAtsAttachmentOutput } from '../types/model.unified'; -import { ats_candidate_attachments as AtsAttachment } from '@prisma/client'; -import { ATS_PROVIDERS } from '@panora/shared'; -import { AtsObject } from '@ats/@lib/@types'; -import { BullQueueService } from '@@core/@core-services/queues/shared.service'; -import { IBaseSync, SyncLinkedUserType } from '@@core/utils/types/interface'; -import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; -import { CoreUnification } from '@@core/@core-services/unification/core-unification.service'; @Injectable() export class SyncService implements OnModuleInit, IBaseSync { @@ -36,7 +30,10 @@ export class SyncService implements OnModuleInit, IBaseSync { this.registry.registerService('ats', 'attachment', this); } onModuleInit() { -// + return; + } + async kickstartSync(id_project?: string) { + return; } // it is synced within candidate sync diff --git a/packages/api/src/ats/candidate/sync/sync.service.ts b/packages/api/src/ats/candidate/sync/sync.service.ts index b708e070d..29b7d8692 100644 --- a/packages/api/src/ats/candidate/sync/sync.service.ts +++ b/packages/api/src/ats/candidate/sync/sync.service.ts @@ -40,7 +40,7 @@ export class SyncService implements OnModuleInit, IBaseSync { this.registry.registerService('ats', 'candidate', this); } onModuleInit() { -// + // } @Cron('0 */8 * * *') // every 8 hours diff --git a/packages/api/src/filestorage/permission/sync/sync.service.ts b/packages/api/src/filestorage/permission/sync/sync.service.ts index e68b01b6a..9f47b7bf6 100644 --- a/packages/api/src/filestorage/permission/sync/sync.service.ts +++ b/packages/api/src/filestorage/permission/sync/sync.service.ts @@ -34,6 +34,10 @@ export class SyncService implements OnModuleInit, IBaseSync { return; } + async kickstartSync(id_project?: string) { + return; + } + // permissions are synced within file, folders, users, groups async saveToDb( diff --git a/packages/api/src/filestorage/sharedlink/sync/sync.service.ts b/packages/api/src/filestorage/sharedlink/sync/sync.service.ts index a9283cf76..5beb7fb92 100644 --- a/packages/api/src/filestorage/sharedlink/sync/sync.service.ts +++ b/packages/api/src/filestorage/sharedlink/sync/sync.service.ts @@ -6,7 +6,7 @@ import { CoreUnification } from '@@core/@core-services/unification/core-unificat import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service'; import { FieldMappingService } from '@@core/field-mapping/field-mapping.service'; -import { IBaseSync, SyncLinkedUserType } from '@@core/utils/types/interface'; +import { IBaseSync } from '@@core/utils/types/interface'; import { Injectable, OnModuleInit } from '@nestjs/common'; import { fs_shared_links as FileStorageSharedLink } from '@prisma/client'; import { v4 as uuidv4 } from 'uuid'; @@ -34,6 +34,10 @@ export class SyncService implements OnModuleInit, IBaseSync { return; } + async kickstartSync(id_project?: string) { + return; + } + async saveToDb( connection_id: string, linkedUserId: string, diff --git a/packages/api/src/filestorage/user/user.module.ts b/packages/api/src/filestorage/user/user.module.ts index 4c53c8dbf..e2cf40f4c 100644 --- a/packages/api/src/filestorage/user/user.module.ts +++ b/packages/api/src/filestorage/user/user.module.ts @@ -15,7 +15,7 @@ import { ServiceRegistry } from './services/registry.service'; import { UserService } from './services/user.service'; import { SyncService } from './sync/sync.service'; import { UserController } from './user.controller'; -import { BullQueueModule } from '@@core/@core-services/queues/queue.module'; + @Module({ controllers: [UserController], providers: [ diff --git a/packages/api/src/ticketing/attachment/sync/sync.service.ts b/packages/api/src/ticketing/attachment/sync/sync.service.ts index 2742d85a8..a48b66755 100644 --- a/packages/api/src/ticketing/attachment/sync/sync.service.ts +++ b/packages/api/src/ticketing/attachment/sync/sync.service.ts @@ -1,12 +1,12 @@ import { LoggerService } from '@@core/@core-services/logger/logger.service'; import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; import { CoreSyncRegistry } from '@@core/@core-services/registries/core-sync.registry'; -import { IBaseSync, SyncLinkedUserType } from '@@core/utils/types/interface'; +import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; +import { IBaseSync } from '@@core/utils/types/interface'; import { Injectable, OnModuleInit } from '@nestjs/common'; import { tcg_attachments as TicketingAttachment } from '@prisma/client'; import { v4 as uuidv4 } from 'uuid'; import { UnifiedTicketingAttachmentOutput } from '../types/model.unified'; -import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; @Injectable() export class SyncService implements OnModuleInit, IBaseSync { @@ -26,6 +26,10 @@ export class SyncService implements OnModuleInit, IBaseSync { // we don't sync here as it is done within the Comment & Ticket Sync services // we only save to the db + async kickstartSync(id_project?: string) { + return; + } + async saveToDb( connection_id: string, linkedUserId: string,