From 37b111a701e6e0ffc7320da364769d2a3b4e3fa2 Mon Sep 17 00:00:00 2001 From: Alex Freska Date: Thu, 22 Feb 2024 10:02:23 -0500 Subject: [PATCH] feat: renterd multipart uploader --- apps/renterd/lib/multipartUpload.spec.ts | 311 +++++++++++++++++++++++ apps/renterd/lib/multipartUpload.ts | 307 ++++++++++++++++++++++ libs/react-renterd/src/worker.ts | 6 +- 3 files changed, 621 insertions(+), 3 deletions(-) create mode 100644 apps/renterd/lib/multipartUpload.spec.ts create mode 100644 apps/renterd/lib/multipartUpload.ts diff --git a/apps/renterd/lib/multipartUpload.spec.ts b/apps/renterd/lib/multipartUpload.spec.ts new file mode 100644 index 000000000..99daee8b7 --- /dev/null +++ b/apps/renterd/lib/multipartUpload.spec.ts @@ -0,0 +1,311 @@ +import { Response, delay } from '@siafoundation/react-core' +import { MultipartParams, MultipartUpload } from './multipartUpload' + +describe('MultipartUpload', () => { + it('should report progress and complete with serial parts', async () => { + // note that the upload mock is configured to report progress 2 times per part + const params = getMockedParams({ + file: new File(['01234567890123456789'], 'test.txt', { + type: 'text/plain', + }), + partSize: 2, + maxConcurrentParts: 1, + }) + const multipartUpload = new MultipartUpload(params) + await multipartUpload.create() + await multipartUpload.start() + expect(params.onProgress.mock.calls.length).toBe(20) + expect( + params.onProgress.mock.calls.map(([params]) => [ + params.sent, + params.total, + params.percentage, + ]) + ).toEqual([ + [1, 20, 5], + [2, 20, 10], + [3, 20, 15], + [4, 20, 20], + [5, 20, 25], + [6, 20, 30], + [7, 20, 35], + [8, 20, 40], + [9, 20, 45], + [10, 20, 50], + [11, 20, 55], + [12, 20, 60], + [13, 20, 65], + [14, 20, 70], + [15, 20, 75], + [16, 20, 80], + [17, 20, 85], + [18, 20, 90], + [19, 20, 95], + [20, 20, 100], + ]) + expect(params.apiBusUploadComplete.post).toHaveBeenCalledWith({ + payload: { + bucket: 'test-bucket', + parts: [ + { eTag: 'etag-1', partNumber: 1 }, + { eTag: 'etag-2', partNumber: 2 }, + { eTag: 'etag-3', partNumber: 3 }, + { eTag: 'etag-4', partNumber: 4 }, + { eTag: 'etag-5', partNumber: 5 }, + { eTag: 'etag-6', partNumber: 6 }, + { eTag: 'etag-7', partNumber: 7 }, + { eTag: 'etag-8', partNumber: 8 }, + { eTag: 'etag-9', partNumber: 9 }, + { eTag: 'etag-10', partNumber: 10 }, + ], + path: 'test-path', + uploadID: '12345', + }, + }) + expect(params.onComplete).toHaveBeenCalled() + expect(params.onError).not.toHaveBeenCalled() + }) + + it('should report progress and complete with concurrent parts', async () => { + // note that the upload mock is configured to report progress 2 times per part + const params = getMockedParams({ + file: new File(['01234567890123456789'], 'test.txt', { + type: 'text/plain', + }), + partSize: 2, + maxConcurrentParts: 5, + }) + const multipartUpload = new MultipartUpload(params) + await multipartUpload.create() + await multipartUpload.start() + expect(params.onProgress.mock.calls.length).toBe(20) + expect( + params.onProgress.mock.calls.map(([params]) => [ + params.sent, + params.total, + params.percentage, + ]) + ).toEqual([ + [1, 20, 5], + [2, 20, 10], + [3, 20, 15], + [4, 20, 20], + [5, 20, 25], + [6, 20, 30], + [7, 20, 35], + [8, 20, 40], + [9, 20, 45], + [10, 20, 50], + [11, 20, 55], + [12, 20, 60], + [13, 20, 65], + [14, 20, 70], + [15, 20, 75], + [16, 20, 80], + [17, 20, 85], + [18, 20, 90], + [19, 20, 95], + [20, 20, 100], + ]) + expect(params.apiBusUploadComplete.post).toHaveBeenCalledWith({ + payload: { + bucket: 'test-bucket', + parts: [ + { eTag: 'etag-1', partNumber: 1 }, + { eTag: 'etag-2', partNumber: 2 }, + { eTag: 'etag-3', partNumber: 3 }, + { eTag: 'etag-4', partNumber: 4 }, + { eTag: 'etag-5', partNumber: 5 }, + { eTag: 'etag-6', partNumber: 6 }, + { eTag: 'etag-7', partNumber: 7 }, + { eTag: 'etag-8', partNumber: 8 }, + { eTag: 'etag-9', partNumber: 9 }, + { eTag: 'etag-10', partNumber: 10 }, + ], + path: 'test-path', + uploadID: '12345', + }, + }) + expect(params.onComplete).toHaveBeenCalled() + expect(params.onError).not.toHaveBeenCalled() + }) + + it('should backoff and recover from a failed part upload', async () => { + // note that the upload mock is configured to report progress 2 times per part + const startTime = Date.now() + const partSize = 2 + const params = getMockedParams({ + file: new File(['012456'], 'test.txt', { type: 'text/plain' }), + partSize, + apiWorkerUploadPart: buildMockApiWorkerUploadPart({ + partSize, + failures: [ + { failCallIndex: 1, failPartIndex: 1 }, + { failCallIndex: 2, failPartIndex: 1 }, + { failCallIndex: 3, failPartIndex: 0 }, + ], + }), + maxConcurrentParts: 1, + }) + const multipartUpload = new MultipartUpload(params) + await multipartUpload.create() + await multipartUpload.start() + expect(params.onProgress.mock.calls.length).toBe(11) + expect( + params.onProgress.mock.calls.map(([params]) => [ + params.sent, + params.total, + params.percentage, + ]) + ).toEqual([ + [1, 6, 17], // call 0 + [2, 6, 33], + [3, 6, 50], // call 1 + [4, 6, 67], // fail + [3, 6, 50], // call 2 + [4, 6, 67], // fail + [3, 6, 50], // call 3 fail + [3, 6, 50], // call 4 + [4, 6, 67], + [5, 6, 83], // call 5 + [6, 6, 100], + ]) + expect(params.apiBusUploadComplete.post).toHaveBeenCalledWith({ + payload: { + bucket: 'test-bucket', + parts: [ + { eTag: 'etag-1', partNumber: 1 }, + { eTag: 'etag-2', partNumber: 2 }, + { eTag: 'etag-3', partNumber: 3 }, + ], + path: 'test-path', + uploadID: '12345', + }, + }) + expect(params.onComplete).toHaveBeenCalled() + expect(params.onError).not.toHaveBeenCalled() + const endTime = Date.now() + const elapsedTime = endTime - startTime + // test that 3 iterations of backoff time were added + expect(elapsedTime).toBeGreaterThanOrEqual(3500) + }, 10_000) + + it('should handle an upload create error', async () => { + const params = getMockedParams() + const multipartUpload = new MultipartUpload({ + ...params, + apiBusUploadCreate: { + post: jest.fn(() => Promise.reject(new Error('Create failed'))), + }, + }) + try { + await multipartUpload.create() + } catch (e) { + expect(e).toEqual(new Error('Create failed')) + } + }) + + it('should handle an intentional abort correctly', async () => { + const params = getMockedParams() + const multipartUpload = new MultipartUpload(params) + await multipartUpload.create() + multipartUpload.start() + // allow the upload to get created and begin + await delay(10) + await multipartUpload.abort() + expect(params.apiBusUploadAbort.post).toHaveBeenCalledWith({ + payload: { bucket: 'test-bucket', path: 'test-path', uploadID: '12345' }, + }) + expect(params.onComplete).not.toHaveBeenCalled() + expect(params.onError).not.toHaveBeenCalled() + }) +}) + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function getMockedParams(params?: Partial>) { + const file = + params?.file || + new File(['0123456789'], 'test-file.txt', { type: 'text/plain' }) + const partSize = params?.partSize || 1 + return { + bucket: 'test-bucket', + path: 'test-path', + partSize, + maxConcurrentParts: 1, + file, + apiWorkerUploadPart: buildMockApiWorkerUploadPart({ partSize }), + apiBusUploadComplete: { post: jest.fn() }, + apiBusUploadCreate: { + post: jest.fn(() => + Promise.resolve({ + status: 201, + data: { uploadID: '12345' }, + headers: { ETag: 'etag' }, + }) + ), + }, + apiBusUploadAbort: { post: jest.fn() }, + onProgress: jest.fn(), + onError: jest.fn(), + onComplete: jest.fn(), + ...params, + } +} + +type Failure = { + failCallIndex: number + failPartIndex: number +} + +function buildMockApiWorkerUploadPart({ + partSize, + failures = [], +}: { + failures?: Failure[] + partSize: number +}) { + let currentCallIndex = -1 + return { + put: jest.fn((args) => { + const callIndex = ++currentCallIndex + return new Promise>((resolve, reject) => { + const onUploadProgress = args.config.axios.onUploadProgress + + // Simulate a series of progress events + const eventCount = 2 + const progressPartSize = partSize / eventCount + const total = partSize + let loaded = 0 + let partIndex = 0 + const eTag = `etag-${args.params.partnumber}` + const intervalId = setInterval(() => { + if (args.config.axios.signal?.aborted) { + clearInterval(intervalId) + reject(new Error('Abort')) + return + } + const shouldFail = failures.find( + (failure) => + callIndex === failure.failCallIndex && + partIndex === failure.failPartIndex + ) + loaded += progressPartSize + onUploadProgress({ type: 'progress', loaded, total }) + if (shouldFail) { + clearInterval(intervalId) + reject(new Error('Upload failed')) + } else { + if (loaded >= partSize) { + clearInterval(intervalId) + resolve({ + status: 200, + headers: { etag: eTag }, + }) + } + partIndex++ + } + }, 1) + }) + }), + } +} diff --git a/apps/renterd/lib/multipartUpload.ts b/apps/renterd/lib/multipartUpload.ts new file mode 100644 index 000000000..051d097fb --- /dev/null +++ b/apps/renterd/lib/multipartUpload.ts @@ -0,0 +1,307 @@ +import { triggerErrorToast } from '@siafoundation/design-system' +import { delay } from '@siafoundation/react-core' +import { + useMultipartUploadAbort, + useMultipartUploadPart, + useMultipartUploadComplete, + useMultipartUploadCreate, +} from '@siafoundation/react-renterd' + +type ApiWorkerUploadPart = ReturnType +type ApiBusUploadComplete = ReturnType +type ApiBusUploadCreate = ReturnType +type ApiBusUploadAbort = ReturnType + +export type MultipartParams = { + bucket: string + path: string + file: File + apiWorkerUploadPart: ApiWorkerUploadPart + apiBusUploadComplete: ApiBusUploadComplete + apiBusUploadCreate: ApiBusUploadCreate + apiBusUploadAbort: ApiBusUploadAbort + partSize?: number + maxConcurrentParts?: number + onProgress?: (event: { + sent: number + total: number + percentage: number + }) => void + onError?: (error: Error) => void + onComplete?: () => void +} + +type UploadedPart = { + partNumber: number + eTag: string +} + +export class MultipartUpload { + // params + #bucket: string + #path: string + #file: File + #partSize: number + #maxConcurrentParts: number + #apiWorkerUploadPart: ApiWorkerUploadPart + #apiBusUploadComplete: ApiBusUploadComplete + #apiBusUploadCreate: ApiBusUploadCreate + #apiBusUploadAbort: ApiBusUploadAbort + #onProgress: (progress: { + sent: number + total: number + percentage: number + }) => void + #onError: (error: Error) => void + #onComplete: () => void + + // state + #resolve: () => void + #progressCache: Record + #activeConnections: Record + #pendingPartNumbers: number[] + #uploadedParts: UploadedPart[] + #uploadId: string + #aborted: boolean + // error retry backoff + #initialDelay = 500 // 1/2 second + #maxDelay = 60_000 // 1 minute + #currentDelay = this.#initialDelay + + constructor(options: MultipartParams) { + // params + this.#bucket = options.bucket + this.#path = options.path + this.#partSize = options.partSize || 1024 * 1024 * 5 + this.#maxConcurrentParts = Math.min(options.maxConcurrentParts || 5, 15) + this.#file = options.file + this.#apiWorkerUploadPart = options.apiWorkerUploadPart + this.#apiBusUploadAbort = options.apiBusUploadAbort + this.#apiBusUploadComplete = options.apiBusUploadComplete + this.#apiBusUploadCreate = options.apiBusUploadCreate + this.#onProgress = options.onProgress || (() => null) + this.#onError = options.onError || (() => null) + this.#onComplete = options.onComplete || (() => null) + + // state + this.#progressCache = {} + this.#activeConnections = {} + this.#pendingPartNumbers = [] + this.#uploadedParts = [] + this.#uploadId = null + this.#aborted = false + } + + public async create() { + const createPayload = { + bucket: this.#bucket, + generateKey: true, + path: this.#path, + } + const response = await this.#apiBusUploadCreate.post({ + payload: createPayload, + }) + + this.#uploadId = response.data?.uploadID + + const partCount = Math.ceil(this.#file.size / this.#partSize) + this.#pendingPartNumbers = Array.from( + { length: partCount }, + (_, i) => i + 1 + ) + return this.#uploadId + } + + public async start() { + const promise = new Promise((resolve) => { + this.#resolve = resolve + }) + this.#sendNext() + await promise + } + + public async abort() { + this.#aborted = true + Object.keys(this.#activeConnections) + .map(Number) + .forEach((id) => { + this.#activeConnections[id].abort() + }) + + try { + await this.#apiBusUploadAbort.post({ + payload: { + bucket: this.#bucket, + path: this.#path, + uploadID: this.#uploadId, + }, + }) + } catch (e) { + triggerErrorToast(e.message) + } + } + + public setOnProgress( + onProgress: (progress: { + sent: number + total: number + percentage: number + }) => void + ) { + this.#onProgress = onProgress + } + + public setOnError(onError: (error: Error) => void) { + this.#onError = onError + } + + public setOnComplete(onComplete: () => void) { + this.#onComplete = onComplete + } + + async #sendNext() { + if (this.#aborted) { + return + } + + const activeConnections = Object.keys(this.#activeConnections).length + + if (activeConnections >= this.#maxConcurrentParts) { + return + } + + if (!this.#pendingPartNumbers.length) { + if (!activeConnections) { + this.#complete() + } + + return + } + + const partNumber = this.#pendingPartNumbers.pop() + const partIndex = partNumber - 1 + const partOffset = partIndex * this.#partSize + const partData = this.#file.slice(partOffset, partOffset + this.#partSize) + + // Callback to start another upload after the current one is added to the + // active connections. + // This will boot up the max concurrent uploads. + const tryStartingAnother = () => { + this.#sendNext() + } + + try { + await this.#upload(partNumber, partData, partOffset, tryStartingAnother) + // On successful upload, reset the delay + this.#resetDelay() + } catch (error) { + if (error.name === 'canceled') { + return + } + this.#pendingPartNumbers.push(partNumber) + await this.#waitToRetry() + } + // try again even after a part errors + this.#sendNext() + } + + #resetDelay() { + this.#currentDelay = this.#initialDelay + } + + async #waitToRetry() { + // Increase the delay for the next retry, capped at the maximum delay + const backoff = delay(this.#currentDelay) + this.#currentDelay = Math.min(this.#currentDelay * 2, this.#maxDelay) + await backoff + } + + async #complete() { + try { + const payload = { + bucket: this.#bucket, + path: this.#path, + uploadID: this.#uploadId, + parts: this.#uploadedParts.sort((a, b) => a.partNumber - b.partNumber), + } + await this.#apiBusUploadComplete.post({ + payload: payload, + }) + this.#onComplete() + } catch (error) { + this.#onError(error) + } + this.#resolve() + } + + #handleProgress(partNumber: number, event: ProgressEvent) { + this.#progressCache[partNumber] = event.loaded + + const progressTotal = Object.keys(this.#progressCache) + .map(Number) + .reduce((acc, id) => (acc += this.#progressCache[id]), 0) + + const sent = Math.min(progressTotal, this.#file.size) + const total = this.#file.size + + const percentage = Math.round((sent / total) * 100) + + this.#onProgress({ + sent: sent, + total: total, + percentage: percentage, + }) + } + + async #upload( + partNumber: number, + partData: Blob, + partOffset: number, + afterConnectionIsAdded: () => void + ): Promise { + const controller = new AbortController() + this.#activeConnections[partNumber] = controller + afterConnectionIsAdded() + try { + const response = await this.#apiWorkerUploadPart.put({ + params: { + key: this.#path.slice(1), + bucket: this.#bucket, + uploadid: this.#uploadId, + offset: partOffset, + partnumber: partNumber, + }, + payload: partData, + config: { + axios: { + onUploadProgress: (e) => this.#handleProgress(partNumber, e), + signal: controller.signal, + }, + }, + }) + + // errors such as aborted/canceled request + if (response.error) { + throw new Error(response.error) + } + + const eTag = response.headers['etag'] + if (!eTag) { + throw new Error( + 'No ETag in response, add ETag to Access-Control-Expose-Headers list' + ) + } + const uploadedPart = { + partNumber: partNumber, + // removing the " enclosing characters from the raw ETag + eTag: eTag.replace(/"/g, ''), + } + + this.#uploadedParts.push(uploadedPart) + delete this.#activeConnections[partNumber] + } catch (e) { + delete this.#activeConnections[partNumber] + throw e + } + } +} diff --git a/libs/react-renterd/src/worker.ts b/libs/react-renterd/src/worker.ts index 9cc21a30d..15e821cb0 100644 --- a/libs/react-renterd/src/worker.ts +++ b/libs/react-renterd/src/worker.ts @@ -55,7 +55,7 @@ export function useObjectUpload( ) } -export type MultipartUploadChunkParams = { +export type MultipartUploadPartParams = { key: string uploadid: string partnumber: number @@ -66,8 +66,8 @@ export type MultipartUploadChunkParams = { totalshards?: number } -export function useMultipartUploadChunk( - args?: HookArgsCallback +export function useMultipartUploadPart( + args?: HookArgsCallback ) { return usePutFunc({ ...args,