Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Rate Limit #707

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/api/scripts/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ CREATE TABLE webhooks_payloads
CONSTRAINT PK_webhooks_payload PRIMARY KEY ( id_webhooks_payload )
);

-- ************************************** rate_limit_state
CREATE TABLE rate_limit_state (
id_rate_limit_state uuid NOT NULL,
id_connection NOT NULL,
last_request_timestamp TIMESTAMP NOT NULL,
request_count INTEGER NOT NULL
CONSTRAINT PK_rate_limit_state PRIMARY KEY ( id_rate_limit_state )
);

-- ************************************** webhook_endpoints
CREATE TABLE webhook_endpoints
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/@core/@core-services/queues/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import { Queues } from './types';
{
name: Queues.RAG_DOCUMENT_PROCESSING,
},
{
name: Queues.RATE_LIMIT_FAILED_JOBS,
},
),
],
providers: [BullQueueService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export class BullQueueService {
public readonly failedPassthroughRequestsQueue: Queue,
@InjectQueue(Queues.RAG_DOCUMENT_PROCESSING)
private ragDocumentQueue: Queue,
@InjectQueue(Queues.RATE_LIMIT_FAILED_JOBS)
private rlFailedJobsQueue: Queue,
) {}

// getters
Expand All @@ -35,6 +37,9 @@ export class BullQueueService {
getRagDocumentQueue() {
return this.ragDocumentQueue;
}
getRlFailedJobsQueue() {
return this.rlFailedJobsQueue;
}

async removeRepeatableJob(jobName: string) {
const jobs = await this.syncJobsQueue.getRepeatableJobs();
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/@core/@core-services/queues/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export enum Queues {
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties
FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff
RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING',
RATE_LIMIT_FAILED_JOBS = 'RATE_LIMIT_FAILED_JOBS',
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import { Injectable } from '@nestjs/common';
import { CoreSyncRegistry } from '../registries/core-sync.registry';
import { CoreUnification } from './core-unification.service';
import { v4 as uuidv4 } from 'uuid';
import { PrismaService } from '../prisma/prisma.service';
import { ConnectionUtils } from '@@core/connections/@utils';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { RagService } from '@@core/rag/rag.service';
import { FileInfo } from '@@core/rag/types';
import { RateLimitError } from '@@core/rate-limit/error';
import {
ApiResponse,
getFileExtensionFromMimeType,
TargetObject,
} from '@@core/utils/types';
import { UnifySourceType } from '@@core/utils/types/unify.output';
import { WebhookService } from '../webhooks/panora-webhooks/webhook.service';
import { ConnectionUtils } from '@@core/connections/@utils';
import { IBaseObjectService, SyncParam } from '@@core/utils/types/interface';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { LoggerService } from '../logger/logger.service';
import { RagService } from '@@core/rag/rag.service';
import { FileInfo } from '@@core/rag/types';
import { UnifySourceType } from '@@core/utils/types/unify.output';
import { Injectable } from '@nestjs/common';
import { fs_files as FileStorageFile } from '@prisma/client';
import { v4 as uuidv4 } from 'uuid';
import { LoggerService } from '../logger/logger.service';
import { PrismaService } from '../prisma/prisma.service';
import { BullQueueService } from '../queues/shared.service';
import { CoreSyncRegistry } from '../registries/core-sync.registry';
import { WebhookService } from '../webhooks/panora-webhooks/webhook.service';
import { CoreUnification } from './core-unification.service';

@Injectable()
export class IngestDataService {
Expand All @@ -29,6 +31,7 @@ export class IngestDataService {
private logger: LoggerService,
private fieldMappingService: FieldMappingService,
private ragService: RagService,
private queues: BullQueueService,
) {}

async syncForLinkedUser<T, U, V extends IBaseObjectService>(
Expand Down Expand Up @@ -87,7 +90,7 @@ export class IngestDataService {

// Construct the syncParam object dynamically
const syncParam: SyncParam = {
linkedUserId,
connection,
custom_properties: remoteProperties,
};

Expand Down Expand Up @@ -144,6 +147,32 @@ export class IngestDataService {
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`,
syncError,
);
// handle the case where ratelimit is throwed
if (syncError instanceof RateLimitError) {
this.logger.warn(
`Rate limit exceeded for ${integrationId} ${commonObject}. Retry after ${syncError.retryAfter}ms.`,
);
// You might want to add some logic here to handle the rate limit,
// such as scheduling a retry or notifying the user.
const rlFailedJobsQueue = this.queues.getRlFailedJobsQueue();
await rlFailedJobsQueue.add(
'rate-limit-sync',
{
method: 'syncForLinkedUser',
args: [
integrationId,
linkedUserId,
vertical,
commonObject,
service,
params,
wh_real_time_trigger,
],
},
{ delay: syncError.retryAfter },
);
return;
}
// Optionally, you could create an event to log this error
/*await this.prisma.events.create({
data: {
Expand Down
5 changes: 4 additions & 1 deletion packages/api/src/@core/core.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import { OrganisationsModule } from './organisations/organisations.module';
import { PassthroughModule } from './passthrough/passthrough.module';
import { ProjectConnectorsModule } from './project-connectors/project-connectors.module';
import { ProjectsModule } from './projects/projects.module';
import { SyncModule } from './sync/sync.module';
import { RagModule } from './rag/rag.module';
import { RateLimitModule } from './rate-limit/rate-limit.module';
import { SyncModule } from './sync/sync.module';

@Module({
imports: [
Expand All @@ -36,6 +37,7 @@ import { RagModule } from './rag/rag.module';
SyncModule,
ProjectConnectorsModule,
BullQueueModule,
RateLimitModule,
RagModule,
],
exports: [
Expand All @@ -55,6 +57,7 @@ import { RagModule } from './rag/rag.module';
SyncModule,
ProjectConnectorsModule,
IngestDataService,
RateLimitModule,
BullQueueModule,
RagModule,
],
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/@core/rate-limit/error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export class RateLimitError extends Error {
constructor(message: string, public retryAfter: number) {
super(message);
this.name = 'RateLimitError';
}
}
33 changes: 33 additions & 0 deletions packages/api/src/@core/rate-limit/rate-limit.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Queues } from '@@core/@core-services/queues/types';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';

@Processor(Queues.RATE_LIMIT_FAILED_JOBS)
export class RateLimitJobProcessor {
constructor(private ingestDataService: IngestDataService) {}

@Process('rate-limit-sync')
async processRateLimitedJob(job: Job<{ method: string; args: any[] }>) {
const { method, args } = job.data;
try {
if (method === 'syncForLinkedUser') {
await this.ingestDataService.syncForLinkedUser(
...(args as Parameters<
typeof this.ingestDataService.syncForLinkedUser
>),
);
}

// Fallback for other methods (if any)
/*const targetInstance = this.moduleRef.get(target, { strict: false });
if (targetInstance && typeof targetInstance[method] === 'function') {
await targetInstance[method](...args);
return;
}*/
} catch (error) {
console.error(`Error processing rate-limited job: ${error.message}`);
throw error;
}
}
}
33 changes: 33 additions & 0 deletions packages/api/src/@core/rate-limit/rate-limit.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { RateLimitService } from './rate-limit.service';

export function RateLimit() {
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor,
) {
const originalMethod = descriptor.value;

descriptor.value = async function (...args: any[]) {
const rateLimitService: RateLimitService = (this as any).rateLimitService;
const { connection } = args[0];

if (!rateLimitService) {
console.error('RateLimitService not found in the class instance');
return originalMethod.apply(this, args);
}

try {
await rateLimitService.checkRateLimit(
connection.id_connection,
connection.provider_slug,
);
return await originalMethod.apply(this, args);
} catch (error) {
throw error;
}
};

return descriptor;
};
}
9 changes: 9 additions & 0 deletions packages/api/src/@core/rate-limit/rate-limit.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { RateLimitJobProcessor } from './rate-limit.consumer';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service';

@Module({
providers: [RateLimitJobProcessor, WebhookService, PrismaService],
})
export class RateLimitModule {}
137 changes: 137 additions & 0 deletions packages/api/src/@core/rate-limit/rate-limit.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { Injectable } from '@nestjs/common';
import { RateLimitError } from './error';

interface RateLimitPolicy {
timeWindow: number;
maxRequests: number;
}

@Injectable()
export class RateLimitService {
constructor(private prisma: PrismaService) {}

async checkRateLimit(
connectionId: string,
providerSlug: string,
): Promise<boolean> {
const policies = await this.getRateLimitPolicies(providerSlug);

for (const policy of policies) {
const { timeWindow, maxRequests } = policy;
const windowStart = new Date(Date.now() - timeWindow * 1000);

const requestCount = await this.prisma.events.count({
where: {
id_connection: connectionId,
timestamp: { gte: windowStart },
type: { endsWith: '.pulled' },
},
});

if (requestCount >= maxRequests) {
const retryAfter = await this.getRetryAfter(connectionId);
throw new RateLimitError('Rate limit exceeded', retryAfter);
}
}

return true; // All checks passed
}

private async getRateLimitPolicies(
providerSlug: string,
): Promise<RateLimitPolicy[]> {
const policies: Record<string, RateLimitPolicy[]> = {
hubspot: [
{ timeWindow: 10, maxRequests: 110 }, // 110 calls per 10 seconds
{ timeWindow: 86400, maxRequests: 250000 }, // 250k calls per day
],
};
return policies[providerSlug] || [];
}

async getRetryAfter(connectionId: string): Promise<number> {
const connection = await this.prisma.connections.findUnique({
where: { id_connection: connectionId },
});

if (!connection) {
throw new Error(`Connection not found for id: ${connectionId}`);
}

const policies = await this.getRateLimitPolicies(connection.provider_slug);

if (policies.length === 0) {
return 10000; // 10 seconds default delay if no policies
}

let maxTimeUntilReset = 0;

for (const policy of policies) {
const windowStart = new Date(Date.now() - policy.timeWindow * 1000);
const requestCount = await this.prisma.events.count({
where: {
id_connection: connectionId,
timestamp: { gte: windowStart },
type: { endsWith: '.pulled' },
},
});

if (requestCount >= policy.maxRequests) {
const latestEvent = await this.prisma.events.findFirst({
where: {
id_connection: connectionId,
type: { endsWith: '.pulled' },
},
orderBy: { timestamp: 'desc' },
});

if (latestEvent) {
const timeSinceLastRequest =
Date.now() - latestEvent.timestamp.getTime();
const timeUntilReset =
policy.timeWindow * 1000 - timeSinceLastRequest;
maxTimeUntilReset = Math.max(maxTimeUntilReset, timeUntilReset);
}
}
}

if (maxTimeUntilReset <= 0) {
return 0;
}

const buffer = 1000; // 1 second buffer
return maxTimeUntilReset + buffer;
}

/*async retryWithBackoff(config: any): Promise<AxiosResponse> {
return backOff(
async () => {
try {
const response = await axios(config);
return response;
} catch (error) {
if (error.response && error.response.status === 429) {
const retryAfter = await this.getRetryAfter(config.connectionId);
if (retryAfter) {
await new Promise((resolve) => setTimeout(resolve, retryAfter));
}
throw error; // Rethrow to trigger backoff
}
throw error; // Rethrow non-rate-limit errors
}
},
{
numOfAttempts: 10,
startingDelay: 1000,
timeMultiple: 2,
maxDelay: 60000,
jitter: 'full',
retry: (e: Error, attemptNumber: number) => {
console.log(`Retry attempt ${attemptNumber} due to: ${e.message}`);
return true;
},
},
);
}*/
}
Loading
Loading