Skip to content

Commit

Permalink
🚑 Updated sync frequency logic
Browse files Browse the repository at this point in the history
  • Loading branch information
naelob committed Sep 14, 2024
1 parent c387394 commit 761d285
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,40 +97,71 @@ export class IngestDataService {
});

let resp: ApiResponse<U[]>;
if (wh_real_time_trigger && wh_real_time_trigger.data.remote_id) {
switch (wh_real_time_trigger.action) {
case 'DELETE':
await this.syncRegistry
.getService(vertical, commonObject)
.removeInDb(
connection.id_connection,
wh_real_time_trigger.data.remote_id,
);
default:
syncParam.webhook_remote_identifier =
wh_real_time_trigger.data.remote_id;
resp = await service.sync(syncParam);
break;
try {
if (wh_real_time_trigger && wh_real_time_trigger.data.remote_id) {
switch (wh_real_time_trigger.action) {
case 'DELETE':
await this.syncRegistry
.getService(vertical, commonObject)
.removeInDb(
connection.id_connection,
wh_real_time_trigger.data.remote_id,
);
default:
syncParam.webhook_remote_identifier =
wh_real_time_trigger.data.remote_id;
resp = await service.sync(syncParam);
break;
}
} else {
resp = await service.sync(syncParam);
}
} else {
resp = await service.sync(syncParam);
}

const sourceObject: U[] = resp.data;
if (!resp || !resp.data) {
this.logger.warn(
`Sync operation for ${integrationId} ${commonObject} returned no data`,
);
return;
}

const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});
const sourceObject: U[] = resp.data;

await this.ingestData<T, U>(
sourceObject,
integrationId,
connection.id_connection,
vertical,
commonObject,
customFieldMappings,
ingestParams,
);
const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});

await this.ingestData<T, U>(
sourceObject,
integrationId,
connection.id_connection,
vertical,
commonObject,
customFieldMappings,
ingestParams,
);
} catch (syncError) {
this.logger.error(
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`,
syncError,
);
// Optionally, you could create an event to log this error
/*await this.prisma.events.create({
data: {
id_connection: connection.id_connection,
id_project: connection.id_project,
id_event: uuidv4(),
status: 'error',
type: `${vertical}.${commonObject}.sync_failed`,
method: 'SYNC',
url: '/sync',
provider: integrationId,
direction: '0',
timestamp: new Date(),
id_linked_user: linkedUserId,
error: syncError.message,
},
});*/
}
} catch (error) {
throw error;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/@core/sync/sync.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class SyncProcessor {
`Error processing job ${job.id} for ${vertical} ${commonObject} (Project: ${projectId})`,
error.stack,
);
throw error; // Re-throw the error to mark the job as failed
return { status: 'failed', error: error.message };
}
}

Expand Down
228 changes: 131 additions & 97 deletions packages/api/src/@core/sync/sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,115 +22,146 @@ export class CoreSyncService {

@Cron(CronExpression.EVERY_30_SECONDS)
async checkAndKickstartSync(user_id?: string) {
const users = user_id
? [
await this.prisma.users.findUnique({
where: {
id_user: user_id,
},
}),
]
: await this.prisma.users.findMany();
if (users && users.length > 0) {
for (const user of users) {
const projects = await this.prisma.projects.findMany({
where: {
id_user: user.id_user,
},
});
for (const project of projects) {
const projectSyncConfig =
await this.prisma.projects_pull_frequency.findFirst({
try {
const users = user_id
? [
await this.prisma.users.findUnique({
where: {
id_user: user_id,
},
}),
]
: await this.prisma.users.findMany();
if (users && users.length > 0) {
for (const user of users) {
try {
const projects = await this.prisma.projects.findMany({
where: {
id_project: project.id_project,
id_user: user.id_user,
},
});

if (projectSyncConfig) {
const syncIntervals = {
crm: projectSyncConfig.crm,
ats: projectSyncConfig.ats,
hris: projectSyncConfig.hris,
accounting: projectSyncConfig.accounting,
filestorage: projectSyncConfig.filestorage,
ecommerce: projectSyncConfig.ecommerce,
ticketing: projectSyncConfig.ticketing,
};

for (const [vertical, interval] of Object.entries(syncIntervals)) {
const now = new Date();
const lastSyncEvent = await this.prisma.events.findFirst({
where: {
id_project: project.id_project,
type: `${vertical}.batchSyncStart`,
},
orderBy: {
timestamp: 'desc',
},
});

const lastSyncTime = lastSyncEvent
? lastSyncEvent.timestamp
: new Date(0);

const secondsSinceLastSync =
Number(now.getTime() - lastSyncTime.getTime()) / 1000;

if (interval && secondsSinceLastSync >= interval) {
await this.prisma.events.create({
data: {
id_project: project.id_project,
id_event: uuidv4(),
status: 'success',
type: `${vertical}.batchSyncStart`,
method: 'GET',
url: '',
provider: '',
direction: '0',
timestamp: new Date(),
},
});
const commonObjects = getCommonObjectsForVertical(vertical);
for (const commonObject of commonObjects) {
const service = this.registry.getService(
vertical,
commonObject,
);
if (service) {
try {
const cronExpression = this.convertIntervalToCron(
Number(interval),
);

await this.bullQueueService.queueSyncJob(
`${vertical}-sync-${commonObject}s`,
{
projectId: project.id_project,
vertical,
commonObject,
for (const project of projects) {
try {
const projectSyncConfig =
await this.prisma.projects_pull_frequency.findFirst({
where: {
id_project: project.id_project,
},
});

if (projectSyncConfig) {
const syncIntervals = {
crm: projectSyncConfig.crm,
ats: projectSyncConfig.ats,
hris: projectSyncConfig.hris,
accounting: projectSyncConfig.accounting,
filestorage: projectSyncConfig.filestorage,
ecommerce: projectSyncConfig.ecommerce,
ticketing: projectSyncConfig.ticketing,
};

for (const [vertical, interval] of Object.entries(
syncIntervals,
)) {
const now = new Date();
const lastSyncEvent = await this.prisma.events.findFirst({
where: {
id_project: project.id_project,
type: `${vertical}.batchSyncStart`,
},
orderBy: {
timestamp: 'desc',
},
});

const lastSyncTime = lastSyncEvent
? lastSyncEvent.timestamp
: new Date(0);

const secondsSinceLastSync =
Number(now.getTime() - lastSyncTime.getTime()) / 1000;

if (interval && secondsSinceLastSync >= interval) {
await this.prisma.events.create({
data: {
id_project: project.id_project,
id_event: uuidv4(),
status: 'success',
type: `${vertical}.batchSyncStart`,
method: 'GET',
url: '',
provider: '',
direction: '0',
timestamp: new Date(),
},
cronExpression,
);
this.logger.log(
`Synced ${vertical}.${commonObject} for project ${project.id_project}`,
);
} catch (error) {
this.logger.error(
`Error syncing ${vertical}.${commonObject} for project ${project.id_project}: ${error.message}`,
error,
);
});
const commonObjects =
getCommonObjectsForVertical(vertical);
for (const commonObject of commonObjects) {
try {
const service = this.registry.getService(
vertical,
commonObject,
);
if (service) {
try {
const cronExpression = this.convertIntervalToCron(
Number(interval),
);

await this.bullQueueService.queueSyncJob(
`${vertical}-sync-${commonObject}s`,
{
projectId: project.id_project,
vertical,
commonObject,
},
cronExpression,
);
this.logger.log(
`Synced ${vertical}.${commonObject} for project ${project.id_project}`,
);
} catch (error) {
this.logger.error(
`Error syncing ${vertical}.${commonObject} for project ${project.id_project}: ${error.message}`,
error,
);
}
} else {
this.logger.warn(
`No service found for ${vertical}.${commonObject}`,
);
}
} catch (error) {
this.logger.error(
`Error processing ${vertical}.${commonObject} for project ${project.id_project}: ${error.message}`,
error,
);
}
}
}
} else {
this.logger.warn(
`No service found for ${vertical}.${commonObject}`,
);
}
}
} catch (projectError) {
this.logger.error(
`Error processing project: ${projectError.message}`,
projectError,
);
}
}
} catch (userError) {
this.logger.error(
`Error processing user: ${userError.message}`,
userError,
);
}
}
}
} catch (error) {
this.logger.error(
`Error in checkAndKickstartSync: ${error.message}`,
error,
);
}
}

Expand Down Expand Up @@ -182,8 +213,11 @@ export class CoreSyncService {
case ConnectorCategory.Ecommerce:
await this.handleEcommerceSync(provider, linkedUserId);
break;
default:
this.logger.warn(`Unsupported vertical: ${vertical}`);
}
} catch (error) {
this.logger.error(`Error in initialSync: ${error.message}`, error);
throw error;
}
}
Expand Down
Loading

0 comments on commit 761d285

Please sign in to comment.