Skip to content

Commit

Permalink
✨ Webhook service
Browse files Browse the repository at this point in the history
  • Loading branch information
naelob committed Dec 14, 2023
1 parent b6fe775 commit 2332a89
Show file tree
Hide file tree
Showing 13 changed files with 601 additions and 69 deletions.
2 changes: 2 additions & 0 deletions packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"prisma:seed": "ts-node ./scripts/seed.webapp.ts"
},
"dependencies": {
"@nestjs/bull": "^10.0.1",
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^3.1.1",
"@nestjs/core": "^10.0.0",
Expand All @@ -40,6 +41,7 @@
"@sentry/tracing": "^7.80.0",
"axios": "^1.5.1",
"bcrypt": "^5.1.1",
"bull": "^4.11.5",
"crypto": "^1.0.1",
"dotenv": "^16.3.1",
"install": "^0.13.0",
Expand Down
92 changes: 79 additions & 13 deletions packages/api/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -373,19 +373,20 @@ model invite_links {

/// This model or at least one of its fields has comments in the database, and requires an additional setup for migrations: Read more: https://pris.ly/d/database-comments
model events {
id_event String @id(map: "pk_jobs") @db.Uuid
status String
type String
direction String
timestamp DateTime @default(now()) @db.Timestamp(6)
method String
url String
provider String
id_linked_user String @db.Uuid
crm_companies crm_companies[]
crm_contacts crm_contacts[]
linked_users linked_users @relation(fields: [id_linked_user], references: [id_linked_user], onDelete: NoAction, onUpdate: NoAction, map: "fk_12")
jobs_status_history jobs_status_history[]
id_event String @id(map: "pk_jobs") @db.Uuid
status String
type String
direction String
timestamp DateTime @default(now()) @db.Timestamp(6)
method String
url String
provider String
id_linked_user String @db.Uuid
crm_companies crm_companies[]
crm_contacts crm_contacts[]
linked_users linked_users @relation(fields: [id_linked_user], references: [id_linked_user], onDelete: NoAction, onUpdate: NoAction, map: "fk_12")
jobs_status_history jobs_status_history[]
webhook_delivery_attempts webhook_delivery_attempts[]
@@index([id_linked_user], map: "fk_linkeduserid_projectid")
}
Expand All @@ -398,3 +399,68 @@ model remote_data {
data String?
created_at DateTime? @db.Timestamp(6)
}

/// This model or at least one of its fields has comments in the database, and requires an additional setup for migrations: Read more: https://pris.ly/d/database-comments
model webhook_endpoints {
id_webhook_endpoint String @id(map: "pk_webhook_endpoint") @db.Uuid
endpoint_description String?
url String
secret String
active Boolean
created_at DateTime @db.Timestamp(6)
id_project String @db.Uuid
scope String
last_update DateTime? @db.Timestamp(6)
id_webhook_scope String? @db.Uuid
webhook_delivery_attempts webhook_delivery_attempts[]
webhook_scope webhook_scope? @relation(fields: [id_webhook_scope], references: [id_webhook_scope], onDelete: NoAction, onUpdate: NoAction, map: "fk_38")
@@index([id_webhook_scope], map: "fk_we_webhook_scope_id")
}

model webhook_scope {
id_webhook_scope String @id(map: "pk_webhook_scope") @db.Uuid
last_update DateTime? @db.Timestamp(6)
crm_contact_created Boolean @map("crm.contact.created")
crm_contact_updated Boolean @map("crm.contact.updated")
crm_contact_removed Boolean @map("crm.contact.removed")
crm_contact_pulled Boolean @map("crm.contact.pulled")
connection_created Boolean @map("connection.created")
connection_deleted Boolean @map("connection.deleted")
webhook_endpoints webhook_endpoints[]
}

model webhooks_payloads {
id_webhooks_payload String @id(map: "pk_webhooks_payload") @db.Uuid
data Json @db.Json
webhook_delivery_attempts webhook_delivery_attempts[]
}

/// This model or at least one of its fields has comments in the database, and requires an additional setup for migrations: Read more: https://pris.ly/d/database-comments
model webhook_delivery_attempts {
id_webhook_delivery_attempt String @id(map: "pk_webhook_event") @db.Uuid
timestamp DateTime @db.Timestamp(6)
status String
next_retry DateTime? @db.Timestamp(6)
id_webhooks_payload String? @db.Uuid
id_webhook_endpoint String? @db.Uuid
id_event String? @db.Uuid
id_webhooks_reponse String? @db.Uuid
webhooks_payloads webhooks_payloads? @relation(fields: [id_webhooks_payload], references: [id_webhooks_payload], onDelete: NoAction, onUpdate: NoAction, map: "fk_38_1")
webhook_endpoints webhook_endpoints? @relation(fields: [id_webhook_endpoint], references: [id_webhook_endpoint], onDelete: NoAction, onUpdate: NoAction, map: "fk_38_2")
events events? @relation(fields: [id_event], references: [id_event], onDelete: NoAction, onUpdate: NoAction, map: "fk_39")
webhooks_reponses webhooks_reponses? @relation(fields: [id_webhooks_reponse], references: [id_webhooks_reponse], onDelete: NoAction, onUpdate: NoAction, map: "fk_40")
@@index([id_webhooks_payload], map: "fk_we_payload_webhookid")
@@index([id_webhook_endpoint], map: "fk_we_webhookendpointid")
@@index([id_event], map: "fk_webhook_delivery_attempt_eventid")
@@index([id_webhooks_reponse], map: "fk_webhook_delivery_attempt_webhook_responseid")
}

/// This model or at least one of its fields has comments in the database, and requires an additional setup for migrations: Read more: https://pris.ly/d/database-comments
model webhooks_reponses {
id_webhooks_reponse String @id(map: "pk_webhooks_reponse") @db.Uuid
http_response_data String
http_status_code String
webhook_delivery_attempts webhook_delivery_attempts[]
}
27 changes: 27 additions & 0 deletions packages/api/scripts/webhook.testing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { PrismaClient } from '@prisma/client';
import { v4 as uuidv4 } from 'uuid';

const prisma = new PrismaClient();

async function main() {
const webhook_endpoint = await prisma.webhook_endpoints.create({
data: {
id_webhook_endpoint: uuidv4(),
url: 'https://webhook.site/5018c5f3-e582-4f6f-a5ca-9e7389e951e2',
secret: '12345679',
active: true,
created_at: new Date(),
id_project: '801f9ede-c698-4e66-a7fc-48d19eebaa4f',
scope: 'crm.contact.synced',
},
});
}

main()
.catch((e) => {
console.error(e);
process.exit(1);
})
.finally(async () => {
await prisma.$disconnect();
});
3 changes: 3 additions & 0 deletions packages/api/src/@core/core.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { FieldMappingModule } from './field-mapping/field-mapping.module';
import { EventsModule } from './events/events.module';
import { MagicLinkModule } from './magic-link/magic-link.module';
import { PassthroughModule } from './passthrough/passthrough.module';
import { WebhookModule } from './webhook/webhook.module';

@Module({
imports: [
Expand All @@ -20,6 +21,7 @@ import { PassthroughModule } from './passthrough/passthrough.module';
EventsModule,
MagicLinkModule,
PassthroughModule,
WebhookModule,
],
exports: [
AuthModule,
Expand All @@ -31,6 +33,7 @@ import { PassthroughModule } from './passthrough/passthrough.module';
EventsModule,
MagicLinkModule,
PassthroughModule,
WebhookModule,
],
})
export class CoreModule {}
16 changes: 16 additions & 0 deletions packages/api/src/@core/webhook/webhook.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { WebhookService } from './webhook.service';
import { PrismaService } from '@@core/prisma/prisma.service';
import { LoggerService } from '@@core/logger/logger.service';
import { WebhookProcessor } from './webhook.processor';

@Module({
imports: [
BullModule.registerQueue({
name: 'webhookDelivery',
}),
],
providers: [WebhookService, PrismaService, LoggerService, WebhookProcessor],
})
export class WebhookModule {}
99 changes: 99 additions & 0 deletions packages/api/src/@core/webhook/webhook.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { LoggerService } from '@@core/logger/logger.service';
import { PrismaService } from '@@core/prisma/prisma.service';
import { OnQueueActive, Process, Processor } from '@nestjs/bull';
import axios from 'axios';
import { Job } from 'bull';
import { v4 as uuidv4 } from 'uuid';
import { WebhookService } from './webhook.service';

@Processor('webhookDelivery')
export class WebhookProcessor {
constructor(
private logger: LoggerService,
private prisma: PrismaService,
private webhookService: WebhookService,
) {
this.logger.setContext(WebhookProcessor.name);
}

@OnQueueActive()
onActive(job: Job) {
this.logger.log(`Processing job ${job.id} ...`);
}

@Process({ concurrency: 5 })
async processWebhooks(job: Job) {
const id_webhook_delivery = job.data.webhook_delivery_id;

this.logger.log(`Start delivering webhook id ${id_webhook_delivery}...`);

await this.prisma.webhook_delivery_attempts.update({
where: { id_webhook_delivery_attempt: id_webhook_delivery },
data: {
status: 'processed',
},
});

// Retrieve the webhook delivery attempt
const deliveryAttempt =
await this.prisma.webhook_delivery_attempts.findUnique({
where: { id_webhook_delivery_attempt: id_webhook_delivery },
include: {
webhook_endpoints: true,
webhooks_payloads: true,
},
});

// Check if the endpoint is active
if (deliveryAttempt.webhook_endpoints.active) {
try {
// Send the payload to the endpoint URL
const response = await axios.post(
deliveryAttempt.webhook_endpoints.url,
{
id_event: deliveryAttempt.id_event,
data: deliveryAttempt.webhooks_payloads.data,
},
);

// Populate the webhooks_responses table
await this.prisma.webhooks_reponses.create({
data: {
id_webhooks_reponse: uuidv4(),
http_response_data: response.data,
http_status_code: response.status.toString(),
},
});
await this.prisma.webhook_delivery_attempts.update({
where: { id_webhook_delivery_attempt: id_webhook_delivery },
data: {
status: 'success',
},
});

this.logger.log('Webhook delivered !');
} catch (error) {
// If the POST request fails, set a next retry time and reinsert the job in the queue
const nextRetry = new Date();
nextRetry.setSeconds(nextRetry.getSeconds() + 60); // Retry after 60 seconds

await this.prisma.webhook_delivery_attempts.update({
where: { id_webhook_delivery_attempt: id_webhook_delivery },
data: {
status: 'failed',
next_retry: nextRetry,
},
});

//re-insert the webhook in the queue
await this.webhookService.handleFailedWebhook(id_webhook_delivery);

this.logger.log(
'Webhook delivery failed. Job reinserted in the queue for retry.',
);
}
} else {
this.logger.log('Webhook endpoint is not active. Delivery skipped.');
}
}
}
77 changes: 77 additions & 0 deletions packages/api/src/@core/webhook/webhook.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
import { PrismaService } from '@@core/prisma/prisma.service';
import { v4 as uuidv4 } from 'uuid';
import { LoggerService } from '@@core/logger/logger.service';
import { handleServiceError } from '@@core/utils/errors';

@Injectable()
export class WebhookService {
constructor(
@InjectQueue('webhookDelivery') private queue: Queue,
private prisma: PrismaService,
private logger: LoggerService,
) {
this.logger.setContext(WebhookService.name);
}

async handleWebhook(
data: any,
eventType: string,
projectId: string,
eventId: string,
) {
try {
//this.logger.log('data any type is ' + data);
//just create an entry in webhook
//search if an endpoint webhook exists for such a projectId and such a scope
const webhook = await this.prisma.webhook_endpoints.findFirst({
where: {
id_project: projectId,
active: true,
scope: eventType, //todo
},
});
if (!webhook) return;

const w_payload = await this.prisma.webhooks_payloads.create({
data: {
id_webhooks_payload: uuidv4(),
data: JSON.stringify(data),
},
});

const w_delivery = await this.prisma.webhook_delivery_attempts.create({
data: {
id_webhook_delivery_attempt: uuidv4(),
id_event: eventId,
timestamp: new Date(),
id_webhook_endpoint: webhook.id_webhook_endpoint,
status: 'queued', // queued | processed | failed | success
id_webhooks_payload: w_payload.id_webhooks_payload,
},
});

// we send the delivery webhook to the queue so it can be processed by our dispatcher worker
const job = await this.queue.add({
webhook_delivery_id: w_delivery.id_webhook_delivery_attempt,
});
} catch (error) {
handleServiceError(error, this.logger);
}
}

async handleFailedWebhook(failed_id_delivery_webhook: string) {
try {
await this.queue.add(
{
webhook_delivery_id: failed_id_delivery_webhook,
},
{ delay: 60000 },
);
} catch (error) {
handleServiceError(error, this.logger);
}
}
}
9 changes: 9 additions & 0 deletions packages/api/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { SentryInterceptor, SentryModule } from '@ntegral/nestjs-sentry';
import { APP_INTERCEPTOR } from '@nestjs/core';
import { LoggerService } from '@@core/logger/logger.service';
import { CoreModule } from '@@core/core.module';
import { BullModule } from '@nestjs/bull';

@Module({
imports: [
Expand Down Expand Up @@ -47,6 +48,14 @@ import { CoreModule } from '@@core/core.module';
},
},
}),
BullModule.forRootAsync({
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
}),
],
controllers: [AppController],
providers: [
Expand Down
Loading

0 comments on commit 2332a89

Please sign in to comment.