From 872b3ce8673e119631e0f3035c60d5f0b5914779 Mon Sep 17 00:00:00 2001 From: Alex Freska Date: Thu, 22 Feb 2024 10:02:23 -0500 Subject: [PATCH] feat: renterd multipart uploader --- apps/renterd/lib/multipartUpload.spec.ts | 304 +++++++++++++++++++++++ apps/renterd/lib/multipartUpload.ts | 283 +++++++++++++++++++++ 2 files changed, 587 insertions(+) create mode 100644 apps/renterd/lib/multipartUpload.spec.ts create mode 100644 apps/renterd/lib/multipartUpload.ts diff --git a/apps/renterd/lib/multipartUpload.spec.ts b/apps/renterd/lib/multipartUpload.spec.ts new file mode 100644 index 000000000..1da9776ad --- /dev/null +++ b/apps/renterd/lib/multipartUpload.spec.ts @@ -0,0 +1,304 @@ +import { Response, delay } from '@siafoundation/react-core' +import { MultipartParams, MultipartUpload } from './multipartUpload' + +describe('MultipartUpload', () => { + it('should report progress and complete with serial chunks', async () => { + // note that the upload mock is configured to report progress 2 times per chunk + const params = getMockedParams({ + file: new File(['01234567890123456789'], 'test.txt', { + type: 'text/plain', + }), + chunkSize: 2, + maxConcurrentChunks: 1, + }) + const multipartUpload = new MultipartUpload(params) + await multipartUpload.start() + expect(params.onProgress.mock.calls.length).toBe(20) + expect( + params.onProgress.mock.calls.map(([params]) => [ + params.sent, + params.total, + params.percentage, + ]) + ).toEqual([ + [1, 20, 5], + [2, 20, 10], + [3, 20, 15], + [4, 20, 20], + [5, 20, 25], + [6, 20, 30], + [7, 20, 35], + [8, 20, 40], + [9, 20, 45], + [10, 20, 50], + [11, 20, 55], + [12, 20, 60], + [13, 20, 65], + [14, 20, 70], + [15, 20, 75], + [16, 20, 80], + [17, 20, 85], + [18, 20, 90], + [19, 20, 95], + [20, 20, 100], + ]) + expect(params.apiBusUploadComplete.post).toHaveBeenCalledWith({ + payload: { + bucket: 'test-bucket', + parts: [ + { eTag: 'etag-1', partNumber: 1 }, + { eTag: 'etag-2', partNumber: 2 }, + { eTag: 'etag-3', partNumber: 3 }, + { eTag: 'etag-4', partNumber: 4 }, + { eTag: 'etag-5', partNumber: 5 }, + { eTag: 'etag-6', partNumber: 6 }, + { eTag: 'etag-7', partNumber: 7 }, + { eTag: 'etag-8', partNumber: 8 }, + { eTag: 'etag-9', partNumber: 9 }, + { eTag: 'etag-10', partNumber: 10 }, + ], + path: 'test-path', + uploadID: '12345', + }, + }) + expect(params.onComplete).toHaveBeenCalled() + expect(params.onError).not.toHaveBeenCalled() + }) + + it('should report progress and complete with concurrent chunks', async () => { + // note that the upload mock is configured to report progress 2 times per chunk + const params = getMockedParams({ + file: new File(['01234567890123456789'], 'test.txt', { + type: 'text/plain', + }), + chunkSize: 2, + maxConcurrentChunks: 5, + }) + const multipartUpload = new MultipartUpload(params) + await multipartUpload.start() + expect(params.onProgress.mock.calls.length).toBe(20) + expect( + params.onProgress.mock.calls.map(([params]) => [ + params.sent, + params.total, + params.percentage, + ]) + ).toEqual([ + [1, 20, 5], + [2, 20, 10], + [3, 20, 15], + [4, 20, 20], + [5, 20, 25], + [6, 20, 30], + [7, 20, 35], + [8, 20, 40], + [9, 20, 45], + [10, 20, 50], + [11, 20, 55], + [12, 20, 60], + [13, 20, 65], + [14, 20, 70], + [15, 20, 75], + [16, 20, 80], + [17, 20, 85], + [18, 20, 90], + [19, 20, 95], + [20, 20, 100], + ]) + expect(params.apiBusUploadComplete.post).toHaveBeenCalledWith({ + payload: { + bucket: 'test-bucket', + parts: [ + { eTag: 'etag-1', partNumber: 1 }, + { eTag: 'etag-2', partNumber: 2 }, + { eTag: 'etag-3', partNumber: 3 }, + { eTag: 'etag-4', partNumber: 4 }, + { eTag: 'etag-5', partNumber: 5 }, + { eTag: 'etag-6', partNumber: 6 }, + { eTag: 'etag-7', partNumber: 7 }, + { eTag: 'etag-8', partNumber: 8 }, + { eTag: 'etag-9', partNumber: 9 }, + { eTag: 'etag-10', partNumber: 10 }, + ], + path: 'test-path', + uploadID: '12345', + }, + }) + expect(params.onComplete).toHaveBeenCalled() + expect(params.onError).not.toHaveBeenCalled() + }) + + it('should backoff and recover from a failed chunk upload', async () => { + // note that the upload mock is configured to report progress 2 times per chunk + const startTime = Date.now() + const chunkSize = 2 + const params = getMockedParams({ + file: new File(['012456'], 'test.txt', { type: 'text/plain' }), + chunkSize, + apiWorkerUploadChunk: buildMockApiWorkerUploadChunk({ + chunkSize, + failures: [ + { failCallIndex: 1, failChunkIndex: 1 }, + { failCallIndex: 2, failChunkIndex: 1 }, + { failCallIndex: 3, failChunkIndex: 0 }, + ], + }), + maxConcurrentChunks: 1, + }) + const multipartUpload = new MultipartUpload(params) + await multipartUpload.start() + expect(params.onProgress.mock.calls.length).toBe(11) + expect( + params.onProgress.mock.calls.map(([params]) => [ + params.sent, + params.total, + params.percentage, + ]) + ).toEqual([ + [1, 6, 17], // call 0 + [2, 6, 33], + [3, 6, 50], // call 1 + [4, 6, 67], // fail + [3, 6, 50], // call 2 + [4, 6, 67], // fail + [3, 6, 50], // call 3 fail + [3, 6, 50], // call 4 + [4, 6, 67], + [5, 6, 83], // call 5 + [6, 6, 100], + ]) + expect(params.apiBusUploadComplete.post).toHaveBeenCalledWith({ + payload: { + bucket: 'test-bucket', + parts: [ + { eTag: 'etag-1', partNumber: 1 }, + { eTag: 'etag-2', partNumber: 2 }, + { eTag: 'etag-3', partNumber: 3 }, + ], + path: 'test-path', + uploadID: '12345', + }, + }) + expect(params.onComplete).toHaveBeenCalled() + expect(params.onError).not.toHaveBeenCalled() + const endTime = Date.now() + const elapsedTime = endTime - startTime + // test that 3 iterations of backoff time were added + expect(elapsedTime).toBeGreaterThanOrEqual(3500) + }, 10_000) + + it('should handle an upload create error', async () => { + const params = getMockedParams() + const multipartUpload = new MultipartUpload({ + ...params, + apiBusUploadCreate: { + post: jest.fn(() => Promise.reject(new Error('Create failed'))), + }, + }) + await multipartUpload.start() + expect(params.onError).toHaveBeenCalledWith(new Error('Create failed')) + }) + + it('should handle an intentional abort correctly', async () => { + const params = getMockedParams() + const multipartUpload = new MultipartUpload(params) + multipartUpload.start() + // allow the upload to get created and begin + await delay(10) + await multipartUpload.abort() + expect(params.apiBusUploadAbort.post).toHaveBeenCalledWith({ + payload: { bucket: 'test-bucket', path: 'test-path', uploadID: '12345' }, + }) + expect(params.onComplete).not.toHaveBeenCalled() + expect(params.onError).not.toHaveBeenCalled() + }) +}) + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function getMockedParams(params?: Partial>) { + const file = + params?.file || + new File(['0123456789'], 'test-file.txt', { type: 'text/plain' }) + const chunkSize = params?.chunkSize || 1 + return { + bucket: 'test-bucket', + path: 'test-path', + chunkSize, + maxConcurrentChunks: 1, + file, + apiWorkerUploadChunk: buildMockApiWorkerUploadChunk({ chunkSize }), + apiBusUploadComplete: { post: jest.fn() }, + apiBusUploadCreate: { + post: jest.fn(() => + Promise.resolve({ + status: 201, + data: { uploadID: '12345' }, + headers: { ETag: 'etag' }, + }) + ), + }, + apiBusUploadAbort: { post: jest.fn() }, + onProgress: jest.fn(), + onError: jest.fn(), + onComplete: jest.fn(), + ...params, + } +} + +type Failure = { + failCallIndex: number + failChunkIndex: number +} + +function buildMockApiWorkerUploadChunk({ + chunkSize, + failures = [], +}: { + failures?: Failure[] + chunkSize: number +}) { + let currentCallIndex = -1 + return { + put: jest.fn((args) => { + const callIndex = ++currentCallIndex + return new Promise>((resolve, reject) => { + const onUploadProgress = args.config.axios.onUploadProgress + + // Simulate a series of progress events + const eventCount = 2 + const progressChunkSize = chunkSize / eventCount + const total = chunkSize + let loaded = 0 + let chunkIndex = 0 + const eTag = `etag-${args.params.partnumber}` + const intervalId = setInterval(() => { + if (args.config.axios.signal?.aborted) { + clearInterval(intervalId) + reject(new Error('Abort')) + return + } + const shouldFail = failures.find( + (failure) => + callIndex === failure.failCallIndex && + chunkIndex === failure.failChunkIndex + ) + loaded += progressChunkSize + onUploadProgress({ type: 'progress', loaded, total }) + if (shouldFail) { + clearInterval(intervalId) + reject(new Error('Upload failed')) + } else { + if (loaded >= chunkSize) { + clearInterval(intervalId) + resolve({ + status: 200, + headers: { etag: eTag }, + }) + } + chunkIndex++ + } + }, 1) + }) + }), + } +} diff --git a/apps/renterd/lib/multipartUpload.ts b/apps/renterd/lib/multipartUpload.ts new file mode 100644 index 000000000..4d8c2f1e4 --- /dev/null +++ b/apps/renterd/lib/multipartUpload.ts @@ -0,0 +1,283 @@ +import { triggerErrorToast } from '@siafoundation/design-system' +import { delay } from '@siafoundation/react-core' +import { + useMultipartUploadAbort, + useMultipartUploadChunk, + useMultipartUploadComplete, + useMultipartUploadCreate, +} from '@siafoundation/react-renterd' + +type ApiWorkerUploadChunk = ReturnType +type ApiBusUploadComplete = ReturnType +type ApiBusUploadCreate = ReturnType +type ApiBusUploadAbort = ReturnType + +export type MultipartParams = { + bucket: string + path: string + file: File + apiWorkerUploadChunk: ApiWorkerUploadChunk + apiBusUploadComplete: ApiBusUploadComplete + apiBusUploadCreate: ApiBusUploadCreate + apiBusUploadAbort: ApiBusUploadAbort + chunkSize?: number + maxConcurrentChunks?: number + onProgress?: (event: { + sent: number + total: number + percentage: number + }) => void + onError?: (error: Error) => void + onComplete: () => void +} + +type UploadedPart = { + partNumber: number + eTag: string +} + +export class MultipartUpload { + // params + #bucket: string + #path: string + #file: File + #chunkSize: number + #maxConcurrentChunks: number + #apiWorkerUploadChunk: ApiWorkerUploadChunk + #apiBusUploadComplete: ApiBusUploadComplete + #apiBusUploadCreate: ApiBusUploadCreate + #apiBusUploadAbort: ApiBusUploadAbort + #onProgress: (progress: { + sent: number + total: number + percentage: number + }) => void + #onError: (error: Error) => void + #onComplete: () => void + + // state + #resolve: () => void + #progressCache: Record + #activeConnections: Record + #pendingPartNumbers: number[] + #uploadedParts: UploadedPart[] + #uploadId: string + // error retry backoff + #initialDelay = 500 // 1/2 second + #maxDelay = 60_000 // 1 minute + #currentDelay = this.#initialDelay + + constructor(options: MultipartParams) { + // params + this.#bucket = options.bucket + this.#path = options.path + this.#chunkSize = options.chunkSize || 1024 * 1024 * 5 + this.#maxConcurrentChunks = Math.min(options.maxConcurrentChunks || 5, 15) + this.#file = options.file + this.#apiWorkerUploadChunk = options.apiWorkerUploadChunk + this.#apiBusUploadAbort = options.apiBusUploadAbort + this.#apiBusUploadComplete = options.apiBusUploadComplete + this.#apiBusUploadCreate = options.apiBusUploadCreate + this.#onProgress = options.onProgress || (() => null) + this.#onError = options.onError || (() => null) + this.#onComplete = options.onComplete || (() => null) + + // state + this.#progressCache = {} + this.#activeConnections = {} + this.#pendingPartNumbers = [] + this.#uploadedParts = [] + this.#uploadId = null + } + + public async start() { + const promise = new Promise((resolve) => { + this.#resolve = resolve + }) + try { + const createPayload = { + bucket: this.#bucket, + key: 'key:0000000000000000000000000000000000000000000000000000000000000000', + path: this.#path, + } + const response = await this.#apiBusUploadCreate.post({ + payload: createPayload, + }) + + this.#uploadId = response.data?.uploadID + + const partCount = Math.ceil(this.#file.size / this.#chunkSize) + this.#pendingPartNumbers = Array.from( + { length: partCount }, + (_, i) => i + 1 + ) + + this.#sendNext() + } catch (error) { + this.#complete(error) + } + await promise + } + + public abort() { + Object.keys(this.#activeConnections) + .map(Number) + .forEach((id) => { + this.#activeConnections[id].abort() + }) + + try { + this.#apiBusUploadAbort.post({ + payload: { + bucket: this.#bucket, + path: this.#path, + uploadID: this.#uploadId, + }, + }) + } catch (e) { + triggerErrorToast(e.message) + } + } + + async #sendNext() { + const activeConnections = Object.keys(this.#activeConnections).length + + if (activeConnections >= this.#maxConcurrentChunks) { + return + } + + if (!this.#pendingPartNumbers.length) { + if (!activeConnections) { + this.#complete() + } + + return + } + + const partNumber = this.#pendingPartNumbers.pop() + const partIndex = partNumber - 1 + const chunkOffset = partIndex * this.#chunkSize + const chunk = this.#file.slice(chunkOffset, chunkOffset + this.#chunkSize) + + // Callback to start another upload after the current one is added to the + // active connections. + // This will boot up the max concurrent uploads. + const tryStartingAnother = () => { + this.#sendNext() + } + + try { + await this.#upload(partNumber, chunk, chunkOffset, tryStartingAnother) + // On successful upload, reset the delay + this.#resetDelay() + } catch (error) { + this.#pendingPartNumbers.push(partNumber) + await this.#waitToRetry() + } + // try again even after a part errors + this.#sendNext() + } + + #resetDelay() { + this.#currentDelay = this.#initialDelay + } + + async #waitToRetry() { + // Increase the delay for the next retry, capped at the maximum delay + const backoff = delay(this.#currentDelay) + this.#currentDelay = Math.min(this.#currentDelay * 2, this.#maxDelay) + await backoff + } + + async #complete(error?: Error) { + if (error) { + this.abort() + this.#onError(error) + } else { + try { + const payload = { + bucket: this.#bucket, + path: this.#path, + uploadID: this.#uploadId, + parts: this.#uploadedParts.sort( + (a, b) => a.partNumber - b.partNumber + ), + } + await this.#apiBusUploadComplete.post({ + payload: payload, + }) + this.#onComplete() + } catch (error) { + this.#onError(error) + } + } + this.#resolve() + } + + #handleProgress(partNumber: number, event: ProgressEvent) { + this.#progressCache[partNumber] = event.loaded + + const progressTotal = Object.keys(this.#progressCache) + .map(Number) + .reduce((acc, id) => (acc += this.#progressCache[id]), 0) + + const sent = Math.min(progressTotal, this.#file.size) + const total = this.#file.size + + const percentage = Math.round((sent / total) * 100) + + this.#onProgress({ + sent: sent, + total: total, + percentage: percentage, + }) + } + + async #upload( + partNumber: number, + chunk: Blob, + chunkOffset: number, + afterConnectionIsAdded: () => void + ): Promise { + const controller = new AbortController() + this.#activeConnections[partNumber] = controller + afterConnectionIsAdded() + try { + const response = await this.#apiWorkerUploadChunk.put({ + params: { + key: this.#path.slice(1), + bucket: this.#bucket, + uploadid: this.#uploadId, + offset: chunkOffset, + partnumber: partNumber, + disablepreshardingencryption: false, + }, + payload: chunk, + config: { + axios: { + onUploadProgress: (e) => this.#handleProgress(partNumber, e), + signal: controller.signal, + }, + }, + }) + + const eTag = response.headers['etag'] + if (!eTag) { + throw new Error( + 'No ETag in response, add ETag to Access-Control-Expose-Headers list' + ) + } + const uploadedPart = { + partNumber: partNumber, + // removing the " enclosing characters from the raw ETag + eTag: eTag.replace(/"/g, ''), + } + + this.#uploadedParts.push(uploadedPart) + delete this.#activeConnections[partNumber] + } catch (e) { + delete this.#activeConnections[partNumber] + throw e + } + } +}