diff --git a/packages/ddc-client/src/DdcClient.ts b/packages/ddc-client/src/DdcClient.ts index 6add2255..62e9c505 100644 --- a/packages/ddc-client/src/DdcClient.ts +++ b/packages/ddc-client/src/DdcClient.ts @@ -62,17 +62,19 @@ export class DdcClient { logger.debug(config, 'DdcClient created'); - bindErrorLogger(this, this.logger, [ - 'getBalance', - 'depositBalance', - 'getDeposit', - 'createBucket', - 'getBucket', - 'getBucketList', - 'store', - 'read', - 'resolveName', - ]); + if (config.logErrors !== false) { + bindErrorLogger(this, this.logger, [ + 'getBalance', + 'depositBalance', + 'getDeposit', + 'createBucket', + 'getBucket', + 'getBucketList', + 'store', + 'read', + 'resolveName', + ]); + } } /** diff --git a/packages/ddc-client/src/index.ts b/packages/ddc-client/src/index.ts index 3fba5209..16050bc5 100644 --- a/packages/ddc-client/src/index.ts +++ b/packages/ddc-client/src/index.ts @@ -18,6 +18,8 @@ export { MAINNET, AuthToken, AuthTokenOperation, + StorageNodeMode, + NodeError, type DagNodeStoreOptions, type Signer, } from '@cere-ddc-sdk/ddc'; diff --git a/packages/ddc/src/logger/types.ts b/packages/ddc/src/logger/types.ts index c146aeaf..c2a09848 100644 --- a/packages/ddc/src/logger/types.ts +++ b/packages/ddc/src/logger/types.ts @@ -26,6 +26,11 @@ export type LoggerOptions = { logLevel?: LogLevel; logOptions?: LoggerConfig; logger?: Logger; + + /** + * Wether to log all errors (including caught ones) + */ + logErrors?: boolean; }; export type { Logger }; diff --git a/packages/ddc/src/nodes/BalancedNode.ts b/packages/ddc/src/nodes/BalancedNode.ts index edcec2dc..aa64f4b9 100644 --- a/packages/ddc/src/nodes/BalancedNode.ts +++ b/packages/ddc/src/nodes/BalancedNode.ts @@ -8,7 +8,9 @@ import { Router, RouterOperation } from '../routing'; import { Piece, MultipartPiece } from '../Piece'; import { DagNode } from '../DagNode'; import { CnsRecord } from '../CnsRecord'; -import { Logger, LoggerOptions, createLogger } from '../logger'; +import { Logger, LoggerOptions, bindErrorLogger, createLogger } from '../logger'; +import { createCorrelationId } from '../activity'; +import { NodeError } from './NodeError'; import { DagNodeGetOptions, DagNodeStoreOptions, @@ -16,6 +18,7 @@ import { PieceStoreOptions, NodeInterface, CnsRecordGetOptions, + CorrelationOptions, } from './NodeInterface'; /** @@ -73,6 +76,18 @@ export class BalancedNode implements NodeInterface { this.retryOptions = { ...this.retryOptions, ...retryOptions, retries: attempts }; } + + if (config.logErrors !== false) { + bindErrorLogger(this, this.logger, [ + 'storePiece', + 'storeDagNode', + 'readPiece', + 'getDagNode', + 'storeCnsRecord', + 'getCnsRecord', + 'resolveName', + ]); + } } /** @@ -86,9 +101,12 @@ export class BalancedNode implements NodeInterface { private async withRetry( bucketId: BucketId, operation: RouterOperation, + { correlationId = createCorrelationId() }: CorrelationOptions, body: (node: NodeInterface, bail: (e: Error) => void, attempt: number) => Promise, ) { - let lastError: RpcError | undefined; + let lastOperationError: RpcError | undefined; + let lastRouterError: Error | undefined; + const exclude: NodeInterface[] = []; return retry( @@ -99,6 +117,7 @@ export class BalancedNode implements NodeInterface { node = await this.router.getNode( operation, bucketId, + { logErrors: false }, exclude.map((node) => node.nodeId), ); @@ -117,25 +136,33 @@ export class BalancedNode implements NodeInterface { node.displayName, ); } + + if (error instanceof Error) { + lastRouterError = error; + } } if (!node) { - throw lastError ?? new Error('No nodes available to handle the operation'); + throw lastOperationError ?? lastRouterError ?? new Error('No nodes available to handle the operation'); } try { return await body(node, bail, attempt); } catch (error) { - if ( - error instanceof RpcError && - RETRYABLE_GRPC_ERROR_CODES.map((status) => GrpcStatus[status]).includes(error.code) - ) { - lastError = error; + const nodeError = error instanceof RpcError ? NodeError.fromRpcError(error) : undefined; - throw error; + if (nodeError) { + nodeError.nodeId = node.nodeId; + nodeError.correlationId = correlationId; + + if (RETRYABLE_GRPC_ERROR_CODES.map((status) => GrpcStatus[status]).includes(nodeError.code)) { + lastOperationError = nodeError; + + throw nodeError; + } } - bail(error as Error); + bail(nodeError || (error as Error)); } }, { @@ -149,8 +176,8 @@ export class BalancedNode implements NodeInterface { ) as T; } - async storePiece(bucketId: BucketId, piece: Piece | MultipartPiece, options?: PieceStoreOptions) { - return this.withRetry(bucketId, RouterOperation.STORE_PIECE, (node, bail, attempt) => + async storePiece(bucketId: BucketId, piece: Piece | MultipartPiece, options: PieceStoreOptions = {}) { + return this.withRetry(bucketId, RouterOperation.STORE_PIECE, options, (node, bail, attempt) => /** * Clone the piece if it is a piece and this is not the first attempt. * This is done to avoid reusing the same stream multiple times. @@ -159,32 +186,38 @@ export class BalancedNode implements NodeInterface { ); } - async readPiece(bucketId: BucketId, cidOrName: string, options?: PieceReadOptions) { - return this.withRetry(bucketId, RouterOperation.READ_PIECE, (node) => node.readPiece(bucketId, cidOrName, options)); + async readPiece(bucketId: BucketId, cidOrName: string, options: PieceReadOptions = {}) { + return this.withRetry(bucketId, RouterOperation.READ_PIECE, options, (node) => + node.readPiece(bucketId, cidOrName, options), + ); } - async storeDagNode(bucketId: BucketId, dagNode: DagNode, options?: DagNodeStoreOptions) { - return this.withRetry(bucketId, RouterOperation.STORE_DAG_NODE, (node) => + async storeDagNode(bucketId: BucketId, dagNode: DagNode, options: DagNodeStoreOptions = {}) { + return this.withRetry(bucketId, RouterOperation.STORE_DAG_NODE, options, (node) => node.storeDagNode(bucketId, dagNode, options), ); } - async getDagNode(bucketId: BucketId, cidOrName: string, options?: DagNodeGetOptions) { - return this.withRetry(bucketId, RouterOperation.READ_DAG_NODE, (node) => + async getDagNode(bucketId: BucketId, cidOrName: string, options: DagNodeGetOptions = {}) { + return this.withRetry(bucketId, RouterOperation.READ_DAG_NODE, options, (node) => node.getDagNode(bucketId, cidOrName, options), ); } - async storeCnsRecord(bucketId: BucketId, record: CnsRecord) { - return this.withRetry(bucketId, RouterOperation.STORE_CNS_RECORD, (node) => node.storeCnsRecord(bucketId, record)); + async storeCnsRecord(bucketId: BucketId, record: CnsRecord, options: DagNodeStoreOptions = {}) { + return this.withRetry(bucketId, RouterOperation.STORE_CNS_RECORD, options, (node) => + node.storeCnsRecord(bucketId, record), + ); } - async getCnsRecord(bucketId: BucketId, name: string) { - return this.withRetry(bucketId, RouterOperation.READ_CNS_RECORD, (node) => node.getCnsRecord(bucketId, name)); + async getCnsRecord(bucketId: BucketId, name: string, options: CnsRecordGetOptions = {}) { + return this.withRetry(bucketId, RouterOperation.READ_CNS_RECORD, options, (node) => + node.getCnsRecord(bucketId, name), + ); } - async resolveName(bucketId: BucketId, cidOrName: string, options?: CnsRecordGetOptions) { - return this.withRetry(bucketId, RouterOperation.READ_CNS_RECORD, (node) => + async resolveName(bucketId: BucketId, cidOrName: string, options: CnsRecordGetOptions = {}) { + return this.withRetry(bucketId, RouterOperation.READ_CNS_RECORD, options, (node) => node.resolveName(bucketId, cidOrName, options), ); } diff --git a/packages/ddc/src/nodes/NodeError.ts b/packages/ddc/src/nodes/NodeError.ts new file mode 100644 index 00000000..59ee5ea6 --- /dev/null +++ b/packages/ddc/src/nodes/NodeError.ts @@ -0,0 +1,15 @@ +import { RpcError } from '@protobuf-ts/runtime-rpc'; + +export class NodeError extends RpcError { + correlationId?: string; + nodeId?: string; + + static fromRpcError(error: RpcError): NodeError { + const finalError = new NodeError(error.message, error.code, error.meta); + + finalError.methodName = error.methodName; + finalError.serviceName = error.serviceName; + + return finalError; + } +} diff --git a/packages/ddc/src/nodes/NodeInterface.ts b/packages/ddc/src/nodes/NodeInterface.ts index 5423a195..d229c988 100644 --- a/packages/ddc/src/nodes/NodeInterface.ts +++ b/packages/ddc/src/nodes/NodeInterface.ts @@ -12,14 +12,19 @@ type NamingOptions = { name?: string; }; -type ActivityOptions = { - correlationId?: string; -}; - type CacheControlOptions = { cacheControl?: 'no-cache'; }; +/** + * The `CorrelationOptions` type defines the correlation options for a DDC operation. + * + * @hidden + */ +export type CorrelationOptions = { + correlationId?: string; +}; + /** * The `OperationAuthOptions` type defines the authentication options for a DDC operation. * @@ -39,7 +44,7 @@ export type OperationAuthOptions = { * @extends OperationAuthOptions */ export type PieceReadOptions = CacheControlOptions & - ActivityOptions & + CorrelationOptions & OperationAuthOptions & { /** * An optional range to read from the piece. @@ -54,7 +59,7 @@ export type PieceReadOptions = CacheControlOptions & * @extends OperationAuthOptions */ export type DagNodeGetOptions = CacheControlOptions & - ActivityOptions & + CorrelationOptions & OperationAuthOptions & { /** * An optional path to retrieve from the DAG node. @@ -69,7 +74,7 @@ export type DagNodeGetOptions = CacheControlOptions & * @extends OperationAuthOptions */ export type CnsRecordGetOptions = CacheControlOptions & - ActivityOptions & + CorrelationOptions & OperationAuthOptions & { /** * An optional path to retrieve from the CNS record. @@ -84,7 +89,7 @@ export type CnsRecordGetOptions = CacheControlOptions & * @extends NamingOptions * @extends OperationAuthOptions */ -export type PieceStoreOptions = ActivityOptions & NamingOptions & OperationAuthOptions; +export type PieceStoreOptions = CorrelationOptions & NamingOptions & OperationAuthOptions; /** * The `DagNodeStoreOptions` type defines the options for storing a DAG node. @@ -93,7 +98,7 @@ export type PieceStoreOptions = ActivityOptions & NamingOptions & OperationAuthO * @extends NamingOptions * @extends OperationAuthOptions */ -export type DagNodeStoreOptions = ActivityOptions & NamingOptions & OperationAuthOptions; +export type DagNodeStoreOptions = CorrelationOptions & NamingOptions & OperationAuthOptions; /** * The `CnsRecordStoreOptions` type defines the options for storing a CNS record. @@ -101,7 +106,7 @@ export type DagNodeStoreOptions = ActivityOptions & NamingOptions & OperationAut * @hidden * @extends OperationAuthOptions */ -export type CnsRecordStoreOptions = ActivityOptions & OperationAuthOptions; +export type CnsRecordStoreOptions = CorrelationOptions & OperationAuthOptions; /** * The `NodeInterface` interface defines the methods to interact with DDC storage nodes. diff --git a/packages/ddc/src/nodes/StorageNode.ts b/packages/ddc/src/nodes/StorageNode.ts index 3ddbaab0..614156af 100644 --- a/packages/ddc/src/nodes/StorageNode.ts +++ b/packages/ddc/src/nodes/StorageNode.ts @@ -93,15 +93,17 @@ export class StorageNode implements NodeInterface { this.logger.debug(config, 'Storage node initialized'); - bindErrorLogger(this, this.logger, [ - 'storePiece', - 'storeDagNode', - 'readPiece', - 'getDagNode', - 'storeCnsRecord', - 'getCnsRecord', - 'resolveName', - ]); + if (config.logErrors !== false) { + bindErrorLogger(this, this.logger, [ + 'storePiece', + 'storeDagNode', + 'readPiece', + 'getDagNode', + 'storeCnsRecord', + 'getCnsRecord', + 'resolveName', + ]); + } } private async getRootToken() { diff --git a/packages/ddc/src/nodes/index.ts b/packages/ddc/src/nodes/index.ts index 73eab2e0..05c87ac9 100644 --- a/packages/ddc/src/nodes/index.ts +++ b/packages/ddc/src/nodes/index.ts @@ -1,3 +1,4 @@ export * from './StorageNode'; export * from './NodeInterface'; export * from './BalancedNode'; +export * from './NodeError'; diff --git a/packages/ddc/src/routing/Router.ts b/packages/ddc/src/routing/Router.ts index e0f0b6cc..f3511dd4 100644 --- a/packages/ddc/src/routing/Router.ts +++ b/packages/ddc/src/routing/Router.ts @@ -1,6 +1,6 @@ import { BucketId, Signer } from '@cere-ddc-sdk/blockchain'; -import { StorageNode } from '../nodes'; +import { StorageNode, StorageNodeConfig } from '../nodes'; import { RouterNode, RouterOperation, RoutingStrategy } from './RoutingStrategy'; import { BlockchainStrategy, BlockchainStrategyConfig } from './BlockchainStrategy'; import { StaticStrategy, StaticStrategyConfig } from './StaticStrategy'; @@ -68,7 +68,12 @@ export class Router { * * @throws Will throw an error if no nodes are available to handle the operation. */ - async getNode(operation: RouterOperation, bucketId: BucketId, exclude: string[] = []) { + async getNode( + operation: RouterOperation, + bucketId: BucketId, + config: Partial = {}, + exclude: string[] = [], + ) { this.logger.info('Getting node for operation "%s" in bucket %s', operation, bucketId); const sdkTokenPromise = this.getSdkToken(); @@ -88,6 +93,7 @@ export class Router { logger: this.logger, authToken: await sdkTokenPromise, nodeId: node.nodeId || node.grpcUrl, + ...config, }); this.logger.info(`Selected node for operation "%s" in bucket %s: %s`, operation, bucketId, storageNode.displayName); diff --git a/packages/file-storage/src/FileStorage.ts b/packages/file-storage/src/FileStorage.ts index 9786fb22..212e5694 100644 --- a/packages/file-storage/src/FileStorage.ts +++ b/packages/file-storage/src/FileStorage.ts @@ -55,12 +55,18 @@ export class FileStorage { constructor(config: RouterConfig & Config); constructor(router: Router, config: Config); constructor(configOrRouter: (RouterConfig & Config) | Router, config?: Config) { + let finalConfig: Config | undefined; + if (configOrRouter instanceof Router) { + finalConfig = config; + this.logger = createLogger('FileStorage', config); this.ddcNode = new BalancedNode({ ...config, router: configOrRouter, logger: this.logger }); this.logger.debug(config, 'FileStorage created'); } else { + finalConfig = configOrRouter; + this.logger = createLogger('FileStorage', configOrRouter); this.blockchain = 'blockchain' in configOrRouter ? configOrRouter.blockchain : undefined; this.ddcNode = new BalancedNode({ @@ -72,7 +78,9 @@ export class FileStorage { this.logger.debug(configOrRouter, 'FileStorage created'); } - bindErrorLogger(this, this.logger, ['store', 'read']); + if (finalConfig?.logErrors === false) { + bindErrorLogger(this, this.logger, ['store', 'read']); + } } /** diff --git a/tests/helpers/ddc.ts b/tests/helpers/ddc.ts index dab42879..319adfa7 100644 --- a/tests/helpers/ddc.ts +++ b/tests/helpers/ddc.ts @@ -53,4 +53,5 @@ type ClientOptions = Pick; export const getClientConfig = (options: ClientOptions = {}): DdcClientConfig => ({ blockchain: BLOCKCHAIN_RPC_URL, logLevel: options.logLevel || 'silent', + ...options, }); diff --git a/tests/specs/Errors.spec.ts b/tests/specs/Errors.spec.ts new file mode 100644 index 00000000..d2f34dd6 --- /dev/null +++ b/tests/specs/Errors.spec.ts @@ -0,0 +1,76 @@ +import { RouterNode } from '@cere-ddc-sdk/ddc'; +import { DdcClient, StorageNodeMode, File, NodeError } from '@cere-ddc-sdk/ddc-client'; +import { getClientConfig, ROOT_USER_SEED } from '../helpers'; + +describe('Errors', () => { + let smallFile: File; + + beforeEach(() => { + smallFile = new File(new TextEncoder().encode('Small file')); + }); + + describe.only('Normal client', () => { + let client: DdcClient; + + beforeAll(async () => { + client = await DdcClient.create(ROOT_USER_SEED, getClientConfig()); + }); + + afterAll(async () => { + await client.disconnect(); + }); + + it('should throw a bucket error', async () => { + const error = await client.store(99n, smallFile).catch((error) => error); + + expect(error).toBeInstanceOf(Error); + expect(error).toEqual( + expect.objectContaining({ + message: expect.stringContaining('Failed to get bucket'), + }), + ); + }); + }); + + describe('Unreachable node', () => { + let client: DdcClient; + const bucketId = 1n; + const unreachableNode: RouterNode = { + mode: StorageNodeMode.Storage, + grpcUrl: `grpc://localhost:9099`, + httpUrl: `http://localhost:8099`, + }; + + beforeAll(async () => { + client = await DdcClient.create(ROOT_USER_SEED, getClientConfig({ nodes: [unreachableNode] })); + }); + + afterAll(async () => { + await client.disconnect(); + }); + + it('should throw unreachible error', async () => { + const error = await client.store(bucketId, smallFile).catch((error) => error); + + expect(error).toBeInstanceOf(NodeError); + expect(error).toEqual( + expect.objectContaining({ + code: 'UNAVAILABLE', + serviceName: 'file.FileApi', + methodName: 'putRawPiece', + nodeId: unreachableNode.grpcUrl, + message: expect.any(String), + correlationId: expect.any(String), + }), + ); + }); + + it('should throw error with explicit correlationId', async () => { + const correlationId = 'test-correlation-id'; + const error = await client.store(bucketId, smallFile, { correlationId }).catch((error) => error); + + expect(error).toBeInstanceOf(NodeError); + expect(error).toEqual(expect.objectContaining({ correlationId })); + }); + }); +});