Skip to content

Commit

Permalink
multi-redis support, quorum support
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghel committed Nov 15, 2023
1 parent 1730e20 commit 28b6513
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ import { ModuleMetadata } from '@nestjs/common';
export interface RedlockModuleAsyncOptions extends Pick<ModuleMetadata, 'imports'> {
useFactory: (...args: any[]) => Promise<RedlockModuleOptions> | RedlockModuleOptions;
inject?: any[];
imports?: any[];
}
45 changes: 39 additions & 6 deletions packages/cache/src/redlock/redlock.module.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,72 @@
import { DynamicModule, Module } from '@nestjs/common';
import { DynamicModule, Module, Provider } from '@nestjs/common';
import { MetricsModule } from '@multiversx/sdk-nestjs-monitoring';
import { RedisModule } from '@multiversx/sdk-nestjs-redis';
import { REDLOCK_TOKEN } from './redlock.constants';
import { RedlockService } from './redlock.service';
import { RedlockModuleAsyncOptions, RedlockModuleOptions } from './entities';
import Redis from 'ioredis';

@Module({})
export class RedlockModule {
static forRoot(options: RedlockModuleOptions): DynamicModule {
static forRoot(redisOptionsArray: RedlockModuleOptions[]): DynamicModule {
const redisProviders: Provider[] = redisOptionsArray.map((option, index) => ({
provide: `REDIS_CLIENT_${index}`,
useFactory: () => new Redis(option.config),
}));

const redisClientsProvider: Provider = {
provide: 'REDIS_CLIENTS',
useFactory: (...clients: Redis[]) => clients,
inject: redisProviders.map((_, index) => `REDIS_CLIENT_${index}`),
};

return {
module: RedlockModule,
imports: [
RedisModule.forRoot(options, REDLOCK_TOKEN),
RedisModule, // Import RedisModule normally
MetricsModule,
],
providers: [
...redisProviders,
redisClientsProvider,
RedlockService,
],
exports: [
'REDIS_CLIENTS',
RedlockService,
],
};
}

static forRootAsync(asyncOptions: RedlockModuleAsyncOptions): DynamicModule {
static forRootAsync(asyncOptionsArray: RedlockModuleAsyncOptions[]): DynamicModule {
const asyncProviders: Provider[] = asyncOptionsArray.map((asyncOptions, index) => ({
provide: `REDIS_CLIENT_${index}`,
useFactory: async (...args: any[]) => {
const options = await asyncOptions.useFactory(...args);
return new Redis(options.config);
},
inject: asyncOptions.inject || [],
}));

const redisClientsAsyncProvider: Provider = {
provide: 'REDIS_CLIENTS',
useFactory: (...clients: Redis[]) => clients,
inject: asyncProviders.map((_, index) => `REDIS_CLIENT_${index}`),
};

return {
module: RedlockModule,
imports: [
RedisModule.forRootAsync(asyncOptions, REDLOCK_TOKEN),
RedisModule, // Import RedisModule normally
MetricsModule,
...asyncOptionsArray.map(ao => ao.imports).flat(),
],
providers: [
...asyncProviders,
redisClientsAsyncProvider,
RedlockService,
],
exports: [
'REDIS_CLIENTS',
RedlockService,
],
};
Expand Down
73 changes: 63 additions & 10 deletions packages/cache/src/redlock/redlock.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import Redis from 'ioredis';
import { Inject, Injectable } from '@nestjs/common';
import { MetricsService, PerformanceProfiler } from '@multiversx/sdk-nestjs-monitoring';
import { OriginLogger } from '@multiversx/sdk-nestjs-common';
import { REDIS_CLIENT_TOKEN } from '@multiversx/sdk-nestjs-redis';
import { RedlockConfiguration } from './redlock.configuration';
import { LockTimeoutError } from './errors/lock.timeout.error';
import { RedlockLogLevel } from './entities/redlock.log.level';
Expand All @@ -14,18 +13,58 @@ export class RedlockService {
static logLevel: RedlockLogLevel = RedlockLogLevel.WARNING;

constructor(
@Inject(REDIS_CLIENT_TOKEN) private readonly redis: Redis,
@Inject('REDIS_CLIENTS') private readonly redisArray: Redis[],
private readonly metricsService: MetricsService,
) { }

async release(key: string): Promise<void> {
await this.redis.del(key);
const promise = async (redis: Redis) => await redis.del(key);
await Promise.allSettled(this.redisArray.map(promise));
}

async lock(
type: string,
key: string,
config: RedlockConfiguration,
): Promise<boolean> {
const quorumSize = Math.floor(this.redisArray.length / 2) + 1;
let successCount = 0;
let settledCount = 0;

return await new Promise<boolean>((resolve) => {
const checkQuorum = () => {
if (successCount >= quorumSize) {
console.log({ result: true, successCount, quorumSize, settledCount });
resolve(true);
} else if (settledCount - successCount >= quorumSize) {
// When it's impossible to reach quorum due to too many failures
console.log({ result: false, successCount, quorumSize, settledCount });
resolve(false);
}
};

for (const redis of this.redisArray) {
this.lockSingle(redis, type, key, config)
.then(result => {
if (result === true) {
successCount++;
}
settledCount++;
checkQuorum();
})
.catch(_ => {
settledCount++;
checkQuorum();
});
}
});
}

async lockSingle(
redis: Redis,
type: string,
key: string,
config: RedlockConfiguration,
): Promise<boolean> {
let retryTimes = 0;
let result = false;
Expand All @@ -34,7 +73,7 @@ export class RedlockService {
const profiler = new PerformanceProfiler();

do {
result = await this.lockOnce(lockKey, config.keyExpiration);
result = await this.lockOnce(redis, lockKey, config.keyExpiration);
if (result) {
break;
}
Expand All @@ -43,10 +82,10 @@ export class RedlockService {
await this.sleep(config.retryInterval);
} while (retryTimes <= config.maxRetries);

if (retryTimes > 0) {
if (retryTimes > 0 && result) {
const duration = profiler.stop();

this.logWarning(`Acquired lock for resource '${lockKey}' after ${retryTimes} retries and ${duration.toFixed(0)}ms`);
this.logWarning(`Acquired lock for resource '${lockKey}' after ${retryTimes} retries and ${duration.toFixed(0)}ms with result ${result}`);
this.metricsService.setRedlockAcquireDuration(type, duration);
}

Expand All @@ -57,9 +96,9 @@ export class RedlockService {
return result;
}

private async lockOnce(key: string, keyExpiration: number): Promise<boolean> {
private async lockOnce(redis: Redis, key: string, keyExpiration: number): Promise<boolean> {
// Using SET command with NX and PX options
const result = await this.redis.set(key, '1', 'PX', keyExpiration, 'NX');
const result = await redis.set(key, '1', 'PX', keyExpiration, 'NX');

// The SET command with NX returns null if the key already exists
return result !== null;
Expand Down Expand Up @@ -106,14 +145,28 @@ export class RedlockService {

extensionId = setTimeout(async () => {
signal.aborted = true;
await self.redis.pexpire(lockKey, configuration.extendTtl ?? configuration.keyExpiration);
applyExtension(self, false);

const duration = profiler.stop();
if (duration > configuration.keyExpiration * 10) {
self.metricsService.incrementRedlockFailure(type, 'EXTEND_TIMEOUT');
self.logError(`Stopped applying extension for resource '${lockKey}' since it was held for ${duration.toFixed(0)}ms`);
return;
}

self.metricsService.incrementRedlockFailure(type, 'EXTEND');
self.logWarning(`Applying extension for resource '${lockKey}'`);
applyExtension(self, false);
await self.extend(lockKey, configuration.extendTtl ?? configuration.keyExpiration);
}, waitTime);
}
}

async extend(key: string, expiration: number) {
const promise = async (redis: Redis) => await redis.pexpire(key, expiration);

await Promise.allSettled(this.redisArray.map(promise));
}

private getRedlockConfiguration(keyExpiration: number | RedlockConfiguration): RedlockConfiguration {
if (typeof keyExpiration === 'number') {
return {
Expand Down

0 comments on commit 28b6513

Please sign in to comment.