Skip to content

Commit

Permalink
Routed store operation
Browse files Browse the repository at this point in the history
  • Loading branch information
skambalin committed Sep 15, 2023
1 parent 394ebb6 commit b747958
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class ContentAddressableStorage {
public readonly scheme: SchemeInterface,
public readonly cdnNodeUrl: string,
public readonly cipher: CipherInterface,
private readonly cidBuilder: CidBuilder,
public readonly cidBuilder: CidBuilder,
private readonly readAttempts: number = 1,
private readonly writeAttempts: number = 1,
private defaultSession: Session | null = null,
Expand Down Expand Up @@ -135,6 +135,20 @@ export class ContentAddressableStorage {
throw new Error(`unable to find cdn nodes in cluster='${clusterAddress}'`);
}

buildCid(bucketId: bigint, piece: Piece, encryptionOptions?: EncryptionOptions) {
const targetPiece = piece.clone();

if (encryptionOptions) {
targetPiece.tags.push(new Tag(DEK_PATH_TAG, encryptionOptions.dekPath));
targetPiece.data = this.cipher.encrypt(piece.data, encryptionOptions.dek);
}

const pbPiece = targetPiece.toProto(bucketId);
const pieceAsBytes = PbPiece.toBinary(pbPiece);

return this.cidBuilder.build(pieceAsBytes);
}

private getCdnNodeUrl(cid: string, route?: Route) {
return route?.getNodeUrl(cid) || this.cdnNodeUrl;
}
Expand Down Expand Up @@ -180,7 +194,6 @@ export class ContentAddressableStorage {
route?: Route,
): Promise<StoreRequest> {
const pbPiece: PbPiece = piece.toProto(bucketId);
// @ts-ignore
const pieceAsBytes = PbPiece.toBinary(pbPiece);
const cid = await this.cidBuilder.build(pieceAsBytes);
const timestamp = new Date();
Expand Down Expand Up @@ -224,18 +237,15 @@ export class ContentAddressableStorage {
}

async store(bucketId: bigint, piece: Piece, options: StoreOptions = {}): Promise<PieceUri> {
const session = this.useSession(options.session);
const request = await this.buildStoreRequest(bucketId, session, piece);

const response = await this.sendRequest(
request.path,
undefined,
{
method: request.method,
body: request.body,
},
options.route,
);
const cid = piece.cid || (await this.buildCid(bucketId, piece));
const cdnNodeUrl = this.getCdnNodeUrl(cid, options.route);
const session = this.useSession(options.route?.getSessionId(cid) || options.session);

const request = await this.buildStoreRequest(bucketId, session, piece, options.route);
const response = await this.sendRequest(cdnNodeUrl, request.path, undefined, {
method: request.method,
body: request.body,
});

const responseData = await response.arrayBuffer();
// @ts-ignore
Expand Down Expand Up @@ -281,15 +291,8 @@ export class ContentAddressableStorage {

search.set('data', Buffer.from(PbRequest.toBinary(pbRequest)).toString('base64'));

const response = await this.sendRequest(
`${BASE_PATH_PIECES}/${cid}`,
search.toString(),
undefined,
options.route,
);

const response = await this.sendRequest(cdnNodeUrl, `${BASE_PATH_PIECES}/${cid}`, search.toString());
const responseData = await response.arrayBuffer();
// @ts-ignore
const protoResponse = PbResponse.fromBinary(new Uint8Array(responseData));

if (!response.ok) {
Expand Down Expand Up @@ -363,12 +366,11 @@ export class ContentAddressableStorage {
signature: requestSignature,
publicKey: this.scheme.publicKey,
});
// @ts-ignore

search.set('data', Buffer.from(PbRequest.toBinary(pbRequest)).toString('base64'));

const response = await this.sendRequest(`${BASE_PATH_PIECES}`, search.toString(), undefined, options.route);
const response = await this.sendRequest(this.cdnNodeUrl, `${BASE_PATH_PIECES}`, search.toString());
const responseData = await response.arrayBuffer();
// @ts-ignore
const protoResponse = PbResponse.fromBinary(new Uint8Array(responseData));

if (!response.status) {
Expand Down Expand Up @@ -505,15 +507,10 @@ export class ContentAddressableStorage {
request.signature = requestSignature;
}

const ackResponse = await this.sendRequest(
'/api/rest/ack',
undefined,
{
method: 'POST',
body: PbRequest.toBinary(request).buffer,
},
route,
);
const ackResponse = await this.sendRequest(cdnNodeUrl, '/api/rest/ack', undefined, {
method: 'POST',
body: PbRequest.toBinary(request).buffer,
});

const pbAckResponse = PbResponse.fromBinary(new Uint8Array(await ackResponse.arrayBuffer()));

Expand All @@ -540,12 +537,13 @@ export class ContentAddressableStorage {
return this.scheme.sign(stringToU8a(`<Bytes>${cid}</Bytes>`));
}

private async sendRequest(pathname: string, query?: string, init?: RequestInit, route?: Route): Promise<Response> {
const url = new URL(this.cdnNodeUrl);
url.pathname = pathname;
private async sendRequest(nodeUrl: string, path: string, query?: string, init?: RequestInit): Promise<Response> {
const url = new URL(path, nodeUrl);

if (query) {
url.search = new URLSearchParams(query).toString();
}

const method = init?.method?.toUpperCase() || 'GET';
const attempts = method === 'GET' ? this.readAttempts : this.writeAttempts;
const options = init != null ? {...init, attempts} : {attempts};
Expand Down
9 changes: 6 additions & 3 deletions packages/content-addressable-storage/src/router/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {stringToU8a, u8aToHex, u8aConcat} from '@polkadot/util';
import {CidBuilder, SchemeInterface} from '@cere-ddc-sdk/core';
import {PieceUri} from '../models/PieceUri';
import {Route, PieceRoute} from './Route';
import {Link} from '../models/Link';

type UnsignedRequest = {
requestId: string;
Expand All @@ -12,6 +13,7 @@ type UnsignedRequest = {
bucketId: BucketId;
userAddress: string;
timestamp: number;
links: Link[];
};

type SignedRequest = UnsignedRequest & {
Expand Down Expand Up @@ -54,8 +56,9 @@ export class Router {
return signedRequest;
}

private createRequest(uri: PieceUri) {
private createRequest(uri: PieceUri, links: Link[] = []) {
return this.signRequest({
links,
clusterId: this.clusterId,
cid: uri.cid,
bucketId: uri.bucketId,
Expand All @@ -69,8 +72,8 @@ export class Router {
throw new Error('Not implemented');
}

async getRoute(uri: PieceUri) {
const request = await this.createRequest(uri);
async getRoute(uri: PieceUri, links: Link[] = []) {
const request = await this.createRequest(uri, links);
const respose = await this.requestRoutingData(request);

return new Route(respose.requestId, respose.routing);
Expand Down
29 changes: 28 additions & 1 deletion packages/file-storage/src/core/CoreFileStorage.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type {UnderlyingSource} from 'stream/web';
import {
ContentAddressableStorage,
Link,
Expand All @@ -8,7 +9,7 @@ import {
StoreOptions,
ReadOptions,
} from '@cere-ddc-sdk/content-addressable-storage';
import type {UnderlyingSource} from 'stream/web';

import {FileStorageConfig} from './FileStorageConfig';
import {IndexedLink} from './model/IndexedLink';

Expand All @@ -23,6 +24,32 @@ export class CoreFileStorage {
this.caStorage = caStorage;
}

async createHeadPiece(
bucketId: bigint,
reader: ReadableStreamDefaultReader<Uint8Array>,
tags: Array<Tag> = [],
encryptionOptions?: EncryptionOptions,
) {
const headPiece = new Piece(new Uint8Array(), tags, []);

while (true) {
const {done, value} = await reader.read();

if (done) {
break;
}

const piece = new Piece(value, [new Tag(multipartTag, 'true')]);

piece.cid = await this.caStorage.buildCid(bucketId, piece, encryptionOptions);
headPiece.links.push(new Link(piece.cid!, BigInt(value.byteLength)));
}

headPiece.cid = await this.caStorage.buildCid(bucketId, headPiece, encryptionOptions);

return headPiece;
}

async uploadFromStreamReader(
bucketId: bigint,
reader: ReadableStreamDefaultReader<Uint8Array>,
Expand Down
10 changes: 8 additions & 2 deletions packages/file-storage/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ type Options = RequiredSelected<Partial<CaCreateOptions>, 'clusterAddress'>;
export class FileStorage implements FileStorageInterface {
readonly config: FileStorageConfig;
readonly caStorage: ContentAddressableStorage;

private readonly fs: CoreFileStorage;
readonly fs: CoreFileStorage;

constructor(caStorage: ContentAddressableStorage, config: FileStorageConfig = new FileStorageConfig()) {
this.fs = new CoreFileStorage(caStorage, config);
Expand All @@ -43,6 +42,13 @@ export class FileStorage implements FileStorageInterface {
return this.caStorage.disconnect();
}

async createHeadPiece(bucketId: bigint, data: Data, tags: Tag[] = [], encryptionOptions?: EncryptionOptions) {
const stream = await transformDataToStream(data);
const reader = stream.pipeThrough(new streamWeb.TransformStream(this.fs.chunkTransformer())).getReader();

return this.fs.createHeadPiece(bucketId, reader, tags, encryptionOptions);
}

async upload(
bucketId: bigint,
data: Data,
Expand Down
3 changes: 2 additions & 1 deletion packages/file-storage/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import {
Link,
PieceUri,
ReadOptions,
Session,
StoreOptions,
Tag,
} from '@cere-ddc-sdk/content-addressable-storage';
import type {ReadableStream as NodeReadableStream} from 'stream/web';
import {GetFirstArgument, RequiredSelected} from '@cere-ddc-sdk/core';

import {FileStorageConfig} from './core/FileStorageConfig.js';
import {CoreFileStorage} from './core/CoreFileStorage.js';

type CaCreateOptions = GetFirstArgument<typeof ContentAddressableStorage.build>;
type Options = RequiredSelected<Partial<CaCreateOptions>, 'clusterAddress'>;
Expand All @@ -30,6 +30,7 @@ export {FileStorageConfig, KB, MB} from './core/FileStorageConfig.js';
export interface FileStorage {
readonly config: FileStorageConfig;
readonly caStorage: ContentAddressableStorage;
readonly fs: CoreFileStorage;

upload(bucketId: bigint, data: Data, tags: Tag[], storeOptions?: StoreOptions): Promise<PieceUri>;

Expand Down
5 changes: 4 additions & 1 deletion tests/FileStorage.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ describe('packages/file-storage/src/index.ts', () => {
});

test('upload chunked data', async () => {
headPieceUri = await storage.upload(bucketId, fileData, []);
const headPiece = await storage.createHeadPiece(bucketId, fileData);
const route = await router.getRoute(new PieceUri(bucketId, headPiece.cid!), headPiece.links);

headPieceUri = await storage.upload(bucketId, fileData, [], {route});
});

test('read chunked data', async () => {
Expand Down

0 comments on commit b747958

Please sign in to comment.