Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redlock service #182

Merged
merged 7 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cache/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './redis-cache';
export * from './redlock';
export * from './in-memory-cache';
export * from './cache';
export * from './jitter';
Expand Down
2 changes: 2 additions & 0 deletions packages/cache/src/redlock/entities/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './redlock.module.options';
export * from './redlock.module.async.options';
5 changes: 5 additions & 0 deletions packages/cache/src/redlock/entities/redlock.log.level.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export enum RedlockLogLevel {
NONE = 'none',
WARNING = 'warning',
ERROR = 'error',
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { RedlockModuleOptions } from './redlock.module.options';
import { ModuleMetadata } from '@nestjs/common';

export interface RedlockModuleAsyncOptions extends Pick<ModuleMetadata, 'imports'> {
useFactory: (...args: any[]) => Promise<RedlockModuleOptions> | RedlockModuleOptions;
inject?: any[];
}
32 changes: 32 additions & 0 deletions packages/cache/src/redlock/entities/redlock.module.options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { ConnectionOptions } from 'tls';

export class RedlockModuleOptions {
config: {
host?: string | undefined;
port?: number | undefined;
username?: string | undefined;
password?: string | undefined;
sentinelUsername?: string | undefined;
sentinelPassword?: string | undefined;
sentinels?: Array<{ host: string; port: number }> | undefined;
connectTimeout?: number | undefined;
name?: string | undefined;
tls?: ConnectionOptions | undefined;
db?: number | undefined;
};

additionalOptions?: {
poolLimit?: number | undefined;
processTtl?: number | undefined;
};

constructor(
options: RedlockModuleOptions['config'],
additionalOptions?: RedlockModuleOptions['additionalOptions'],
) {
this.config = {};
this.additionalOptions = {};
Object.assign(this.config, options);
Object.assign(this.additionalOptions, additionalOptions);
}
}
1 change: 1 addition & 0 deletions packages/cache/src/redlock/errors/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './lock.timeout.error';
5 changes: 5 additions & 0 deletions packages/cache/src/redlock/errors/lock.timeout.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export class LockTimeoutError extends Error {
constructor(lockKey: string) {
super(`Timeout out while attempting to acquire lock for resource '${lockKey}'`);
}
}
6 changes: 6 additions & 0 deletions packages/cache/src/redlock/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export * from './redlock.configuration';
export * from './redlock.constants';
export * from './redlock.module';
export * from './redlock.service';
export * from './entities';
export * from './errors';
6 changes: 6 additions & 0 deletions packages/cache/src/redlock/redlock.configuration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface RedlockConfiguration {
keyExpiration: number;
maxRetries: number;
retryInterval: number;
extendTtl?: number;
}
1 change: 1 addition & 0 deletions packages/cache/src/redlock/redlock.constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export declare const REDLOCK_TOKEN = "REDLOCK_TOKEN";
41 changes: 41 additions & 0 deletions packages/cache/src/redlock/redlock.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { DynamicModule, Module } from '@nestjs/common';
import { MetricsModule } from '@multiversx/sdk-nestjs-monitoring';
import { RedisModule } from '@multiversx/sdk-nestjs-redis';
import { REDLOCK_TOKEN } from './redlock.constants';
import { RedlockService } from './redlock.service';
import { RedlockModuleAsyncOptions, RedlockModuleOptions } from './entities';

@Module({})
export class RedlockModule {
static forRoot(options: RedlockModuleOptions): DynamicModule {
return {
module: RedlockModule,
imports: [
RedisModule.forRoot(options, REDLOCK_TOKEN),
MetricsModule,
],
providers: [
RedlockService,
],
exports: [
RedlockService,
],
};
}

static forRootAsync(asyncOptions: RedlockModuleAsyncOptions): DynamicModule {
return {
module: RedlockModule,
imports: [
RedisModule.forRootAsync(asyncOptions, REDLOCK_TOKEN),
MetricsModule,
],
providers: [
RedlockService,
],
exports: [
RedlockService,
],
};
}
}
141 changes: 141 additions & 0 deletions packages/cache/src/redlock/redlock.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import Redis from 'ioredis';
import { Inject, Injectable } from '@nestjs/common';
import { MetricsService, PerformanceProfiler } from '@multiversx/sdk-nestjs-monitoring';
import { OriginLogger } from '@multiversx/sdk-nestjs-common';
import { REDIS_CLIENT_TOKEN } from '@multiversx/sdk-nestjs-redis';
import { RedlockConfiguration } from './redlock.configuration';
import { LockTimeoutError } from './errors/lock.timeout.error';
import { RedlockLogLevel } from './entities/redlock.log.level';

@Injectable()
export class RedlockService {
private readonly logger = new OriginLogger(RedlockService.name);

static logLevel: RedlockLogLevel = RedlockLogLevel.WARNING;

constructor(
@Inject(REDIS_CLIENT_TOKEN) private readonly redis: Redis,
private readonly metricsService: MetricsService,
) { }

async release(key: string): Promise<void> {
await this.redis.del(key);
}

async lock(
type: string,
key: string,
config: RedlockConfiguration,
): Promise<boolean> {
let retryTimes = 0;
let result = false;
const lockKey = `${type}:${key}`;

const profiler = new PerformanceProfiler();

do {
result = await this.lockOnce(lockKey, config.keyExpiration);
if (result) {
break;
}

retryTimes++;
await this.sleep(config.retryInterval);
} while (retryTimes <= config.maxRetries);

if (retryTimes > 0) {
const duration = profiler.stop();

this.logWarning(`Acquired lock for resource '${lockKey}' after ${retryTimes} retries and ${duration.toFixed(0)}ms`);
this.metricsService.setRedlockAcquireDuration(type, duration);
}

if (!result) {
this.metricsService.incrementRedlockFailure(type, 'ACQUIRE');
}

return result;
}

private async lockOnce(key: string, keyExpiration: number): Promise<boolean> {
// Using SET command with NX and PX options
const result = await this.redis.set(key, '1', 'PX', keyExpiration, 'NX');

// The SET command with NX returns null if the key already exists
return result !== null;
}

private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, Number(ms)));
}

async using<T>(type: string, key: string, action: (signal: { aborted: boolean }) => Promise<T>, keyExpiration: number | RedlockConfiguration): Promise<T> {
const lockKey = `${type}:${key}`;
const configuration = this.getRedlockConfiguration(keyExpiration);

const profiler = new PerformanceProfiler();

const isLocked = await this.lock(type, key, configuration);
if (!isLocked) {
this.logError(`Timeout out while attempting to acquire lock for resource '${lockKey}'`);
throw new LockTimeoutError(lockKey);
}

const signal = {
aborted: false,
};

let extensionId: NodeJS.Timeout | undefined = undefined;
applyExtension(this);

try {
return await action(signal);
} finally {
if (extensionId) {
clearTimeout(extensionId);
}

await this.release(lockKey);

const duration = profiler.stop();
this.metricsService.setRedlockProcessDuration(type, duration);
}

function applyExtension(self: RedlockService, isFirstRun: boolean = true) {
const waitTime = isFirstRun || !configuration.extendTtl ? Math.round(configuration.keyExpiration * 0.9) : Math.round((configuration.extendTtl ?? configuration.keyExpiration) * 0.9);

extensionId = setTimeout(async () => {
signal.aborted = true;
await self.redis.pexpire(lockKey, configuration.extendTtl ?? configuration.keyExpiration);
applyExtension(self, false);
self.metricsService.incrementRedlockFailure(type, 'EXTEND');
self.logWarning(`Applying extension for resource '${lockKey}'`);
}, waitTime);
}
}

private getRedlockConfiguration(keyExpiration: number | RedlockConfiguration): RedlockConfiguration {
if (typeof keyExpiration === 'number') {
return {
keyExpiration: keyExpiration,
maxRetries: 100,
retryInterval: Math.round(keyExpiration / 100),
};
}

return keyExpiration;
}

private logWarning(message: string): void {
if (RedlockService.logLevel === RedlockLogLevel.WARNING) {
this.logger.warn(message);
}
}


private logError(message: string): void {
if ([RedlockLogLevel.WARNING, RedlockLogLevel.ERROR].includes(RedlockService.logLevel)) {
this.logger.error(message);
}
}
}
41 changes: 41 additions & 0 deletions packages/monitoring/src/metrics/metrics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ export class MetricsService {
private static queuePublishGauge: Gauge<string>;
private static queueConsumeGauge: Gauge<string>;
private static duplicateQueueMessagesDetected: Gauge<string>;
private static redlockAcquireDurationHistogram: Histogram<string>;
private static redlockProcessDurationHistogram: Histogram<string>;
private static redlockFailureGauge: Gauge<string>;
private static isDefaultMetricsRegistered: boolean = false;

constructor() {
Expand Down Expand Up @@ -197,6 +200,32 @@ export class MetricsService {
});
}

if (!MetricsService.redlockAcquireDurationHistogram) {
MetricsService.redlockAcquireDurationHistogram = new Histogram({
name: 'redlock_acquire_duration',
help: 'Redlock acquire duration',
labelNames: ['type'],
buckets: [],
});
}

if (!MetricsService.redlockProcessDurationHistogram) {
MetricsService.redlockProcessDurationHistogram = new Histogram({
name: 'redlock_process_duration',
help: 'Redlock process duration',
labelNames: ['type'],
buckets: [],
});
}

if (!MetricsService.redlockFailureGauge) {
MetricsService.redlockFailureGauge = new Gauge({
name: 'redlock_failures',
help: 'Redlock failures',
labelNames: ['type', 'failure'],
});
}

if (!MetricsService.isDefaultMetricsRegistered) {
MetricsService.isDefaultMetricsRegistered = true;
collectDefaultMetrics();
Expand Down Expand Up @@ -290,6 +319,18 @@ export class MetricsService {
MetricsService.consumerHistogram.labels(consumer).observe(duration);
}

setRedlockAcquireDuration(type: string, duration: number): void {
MetricsService.redlockAcquireDurationHistogram.labels(type).observe(duration);
}

setRedlockProcessDuration(type: string, duration: number): void {
MetricsService.redlockProcessDurationHistogram.labels(type).observe(duration);
}

incrementRedlockFailure(type: string, failure: string): void {
MetricsService.redlockFailureGauge.labels(type, failure).inc();
}

async getMetrics(): Promise<string> {
return await register.metrics();
}
Expand Down