diff --git a/.changeset/empty-garlics-look.md b/.changeset/empty-garlics-look.md new file mode 100644 index 000000000..420674fe4 --- /dev/null +++ b/.changeset/empty-garlics-look.md @@ -0,0 +1,5 @@ +--- +'renterd': minor +--- + +File uploads now use multipart uploads. diff --git a/.changeset/pretty-roses-develop.md b/.changeset/pretty-roses-develop.md new file mode 100644 index 000000000..203c04ff1 --- /dev/null +++ b/.changeset/pretty-roses-develop.md @@ -0,0 +1,5 @@ +--- +'renterd': minor +--- + +File uploads now have a max concurrency and get queued. diff --git a/apps/renterd/components/Files/Columns/FilesHealthColumn/FilesHealthColumnContents.tsx b/apps/renterd/components/Files/Columns/FilesHealthColumn/FilesHealthColumnContents.tsx index 8bd1e6bf9..4c046d63f 100644 --- a/apps/renterd/components/Files/Columns/FilesHealthColumn/FilesHealthColumnContents.tsx +++ b/apps/renterd/components/Files/Columns/FilesHealthColumn/FilesHealthColumnContents.tsx @@ -57,6 +57,7 @@ export function FilesHealthColumnContents({ const slabs = sortBy( obj.data.object.slabs.map((s) => ({ ...s.slab, + key: `${s.offset}${s.length}${s.slab.key}`, isPartialSlab: !!s.slab.shards, contractSetShards: s.slab.shards?.length ? computeSlabContractSetShards({ diff --git a/apps/renterd/components/TransfersBar.tsx b/apps/renterd/components/TransfersBar.tsx index 153b3ac45..911f52dc8 100644 --- a/apps/renterd/components/TransfersBar.tsx +++ b/apps/renterd/components/TransfersBar.tsx @@ -15,6 +15,7 @@ import { import { useState } from 'react' import { useFilesManager } from '../contexts/filesManager' import { useAppSettings } from '@siafoundation/react-core' +import { upperFirst } from '@technically/lodash' function getProgress(transfer: { loaded?: number; size?: number }) { return transfer.loaded !== undefined ? transfer.loaded / transfer.size : 1 @@ -22,8 +23,7 @@ function getProgress(transfer: { loaded?: number; size?: number }) { export function TransfersBar() { const { isUnlockedAndAuthedRoute } = useAppSettings() - const { uploadsList, uploadCancel, downloadsList, downloadCancel } = - useFilesManager() + const { uploadsList, downloadsList, downloadCancel } = useFilesManager() const [maximized, setMaximized] = useState(true) const uploadCount = uploadsList.length @@ -56,7 +56,7 @@ export function TransfersBar() { const progress = getProgress(upload) return (
@@ -67,7 +67,7 @@ export function TransfersBar() { tip="Cancel file upload" variant="ghost" size="none" - onClick={() => uploadCancel(upload)} + onClick={() => upload.uploadAbort?.()} > @@ -80,7 +80,7 @@ export function TransfersBar() { />
- {progress === 1 ? 'Processing' : 'Uploading'} + {upperFirst(upload.uploadStatus)} {(progress * 100).toFixed(0)}% diff --git a/apps/renterd/contexts/filesManager/index.tsx b/apps/renterd/contexts/filesManager/index.tsx index d821cf6a4..0085578e9 100644 --- a/apps/renterd/contexts/filesManager/index.tsx +++ b/apps/renterd/contexts/filesManager/index.tsx @@ -87,7 +87,7 @@ function useFilesManagerMain() { [router, activeDirectory] ) - const { uploadFiles, uploadsList, uploadCancel } = useUploads({ + const { uploadFiles, uploadsList } = useUploads({ activeDirectoryPath, }) const { downloadFiles, downloadsList, getFileUrl, downloadCancel } = @@ -174,7 +174,6 @@ function useFilesManagerMain() { navigateToModeSpecificFiltering, uploadFiles, uploadsList, - uploadCancel, downloadFiles, downloadsList, downloadCancel, diff --git a/apps/renterd/contexts/filesManager/types.ts b/apps/renterd/contexts/filesManager/types.ts index a723e9181..33b45f185 100644 --- a/apps/renterd/contexts/filesManager/types.ts +++ b/apps/renterd/contexts/filesManager/types.ts @@ -1,6 +1,7 @@ import { Bucket } from '@siafoundation/react-renterd' import { FullPath } from '../../lib/paths' import { TableColumn } from '@siafoundation/design-system' +import { MultipartUpload } from '../../lib/multipartUpload' export type ObjectType = 'bucket' | 'directory' | 'file' @@ -78,3 +79,18 @@ export const sortOptions: { id: SortField; label: string; category: string }[] = ] export type ExplorerMode = 'directory' | 'flat' + +export type UploadStatus = 'queued' | 'uploading' | 'processing' + +export type ObjectUploadData = ObjectData & { + upload: MultipartUpload + uploadStatus: UploadStatus + uploadAbort?: () => Promise + uploadFile?: File +} + +export type ObjectUploadRemoteData = ObjectData & { + remote: true +} + +export type UploadsMap = Record diff --git a/apps/renterd/contexts/filesManager/uploads.tsx b/apps/renterd/contexts/filesManager/uploads.tsx index 8316ee91c..70f882fff 100644 --- a/apps/renterd/contexts/filesManager/uploads.tsx +++ b/apps/renterd/contexts/filesManager/uploads.tsx @@ -1,25 +1,34 @@ import { + minutesInMilliseconds, triggerErrorToast, - triggerSuccessToast, - triggerToast, } from '@siafoundation/design-system' -import { useBuckets, useObjectUpload } from '@siafoundation/react-renterd' +import { + Bucket, + useBuckets, + useMultipartUploadAbort, + useMultipartUploadPart, + useMultipartUploadComplete, + useMultipartUploadCreate, +} from '@siafoundation/react-renterd' import { throttle } from '@technically/lodash' -import { useCallback, useMemo, useState } from 'react' -import { ObjectData } from './types' +import { useCallback, useMemo, useRef, useState } from 'react' +import { ObjectUploadData, UploadsMap } from './types' import { - bucketAndKeyParamsFromPath, + FullPath, getBucketFromPath, + getKeyFromPath, join, } from '../../lib/paths' +import { MultipartUpload } from '../../lib/multipartUpload' +import { MiBToBytes } from '@siafoundation/units' +import { useMutate } from '@siafoundation/react-core' +import { useRedundancySettings } from '../../hooks/useRedundancySettings' +import { useWarnActiveUploadsOnClose as useWarnActiveUploadsOnClose } from './useWarnActiveUploadsOnClose' -type UploadProgress = ObjectData & { - controller: AbortController -} - -type UploadProgressParams = Omit - -type UploadsMap = Record +const maxConcurrentUploads = 5 +const maxConcurrentPartsPerUpload = 5 +const getMultipartUploadPartSize = (minShards: number) => + MiBToBytes(4).times(minShards) type Props = { activeDirectoryPath: string @@ -27,23 +36,36 @@ type Props = { export function useUploads({ activeDirectoryPath }: Props) { const buckets = useBuckets() - const upload = useObjectUpload() + const mutate = useMutate() + const apiWorkerUploadPart = useMultipartUploadPart() + const apiBusUploadComplete = useMultipartUploadComplete() + const apiBusUploadCreate = useMultipartUploadCreate() + const apiBusUploadAbort = useMultipartUploadAbort() const [uploadsMap, setUploadsMap] = useState({}) + const redundancy = useRedundancySettings({ + config: { + swr: { + refreshInterval: minutesInMilliseconds(1), + }, + }, + }) - const initUploadProgress = useCallback( - (obj: UploadProgressParams) => { + // Because checkAndStartUploads is called in closures/asynchronous callbacks, + // use a ref to ensure the latest version of the function is used. + const ref = useRef<{ + checkAndStartUploads: () => void + }>({ + checkAndStartUploads: () => null, + }) + + const updateStatusToUploading = useCallback( + ({ id }: { id: string }) => { setUploadsMap((map) => ({ ...map, - [obj.path]: { - id: obj.path, - path: obj.path, - bucket: obj.bucket, - name: obj.name, - size: obj.size, - loaded: obj.loaded, - isUploading: true, - controller: obj.controller, - type: 'file', + [id]: { + ...map[id], + uploadStatus: 'uploading', + loaded: 0, }, })) }, @@ -51,17 +73,17 @@ export function useUploads({ activeDirectoryPath }: Props) { ) const updateUploadProgress = useCallback( - (obj: { path: string; loaded: number; size: number }) => { + (obj: { id: string; loaded: number; size: number }) => { setUploadsMap((map) => { - if (!map[obj.path]) { + if (!map[obj.id]) { return map } return { ...map, - [obj.path]: { - ...map[obj.path], - path: obj.path, + [obj.id]: { + ...map[obj.id], loaded: obj.loaded, + uploadStatus: obj.loaded === obj.size ? 'processing' : 'uploading', size: obj.size, }, } @@ -71,9 +93,9 @@ export function useUploads({ activeDirectoryPath }: Props) { ) const removeUpload = useCallback( - (path: string) => { + (id: string) => { setUploadsMap((uploads) => { - delete uploads[path] + delete uploads[id] return { ...uploads, } @@ -82,78 +104,186 @@ export function useUploads({ activeDirectoryPath }: Props) { [setUploadsMap] ) - const uploadCancel = useCallback((upload: UploadProgress) => { - upload.controller.abort() - }, []) - - const uploadFiles = async (files: File[]) => { - files.forEach(async (file) => { - const name = file.name - // https://developer.mozilla.org/en-US/docs/Web/API/File - // Documentation does not include `path` but all browsers populate it - // with the relative path of the file. Whereas webkitRelativePath is - // empty string in most browsers. - // Try `path` otherwise fallback to flat file structure. - const relativeUserFilePath = (file['path'] as string) || file.name - const path = join(activeDirectoryPath, relativeUserFilePath) - const bucketName = getBucketFromPath(path) - const bucket = buckets.data?.find((b) => b.name === bucketName) - - if (uploadsMap[path]) { - triggerErrorToast(`Already uploading file: ${path}`) - return - } + const createMultipartUpload = useCallback( + async ({ + path, + bucket, + uploadFile, + }: { + path: FullPath + bucket: Bucket + uploadFile: File + }) => { + const key = getKeyFromPath(path) + const multipartUpload = new MultipartUpload({ + file: uploadFile, + path: key, + bucket: bucket.name, + apiWorkerUploadPart, + apiBusUploadComplete, + apiBusUploadCreate, + apiBusUploadAbort, + partSize: getMultipartUploadPartSize( + redundancy.data?.minShards || 1 + ).toNumber(), + maxConcurrentParts: maxConcurrentPartsPerUpload, + }) - const controller = new AbortController() - const onUploadProgress = throttle( - (e) => + const uploadId = await multipartUpload.create() + multipartUpload.setOnError((error) => { + triggerErrorToast(error.message) + removeUpload(uploadId) + }) + multipartUpload.setOnProgress( + throttle((progress) => { updateUploadProgress({ - path, - loaded: e.loaded, - size: e.total, - }), - 2000 + id: uploadId, + loaded: progress.sent, + size: progress.total, + }) + }, 200) ) - initUploadProgress({ + multipartUpload.setOnComplete(async () => { + await mutate((key) => key.startsWith('/bus/objects')) + removeUpload(uploadId) + setTimeout(() => { + ref.current.checkAndStartUploads() + }, 100) + }) + return { + uploadId, + multipartUpload, + } + }, + [ + apiBusUploadAbort, + apiBusUploadComplete, + apiBusUploadCreate, + apiWorkerUploadPart, + mutate, + updateUploadProgress, + removeUpload, + redundancy.data, + ] + ) + + const addUploadToQueue = useCallback( + async ({ + path, + bucket, + name, + uploadFile, + }: { + path: FullPath + bucket: Bucket + name: string + uploadFile: File + }) => { + const { uploadId, multipartUpload } = await createMultipartUpload({ path, - name, bucket, - loaded: 0, - size: 1, - controller, + uploadFile, }) - const response = await upload.put({ - params: bucketAndKeyParamsFromPath(path), - payload: file, - config: { - axios: { - onUploadProgress, - signal: controller.signal, + setUploadsMap((map) => ({ + ...map, + [uploadId]: { + id: uploadId, + path: path, + bucket: bucket, + name: name, + size: uploadFile.size, + loaded: 0, + isUploading: true, + upload: multipartUpload, + uploadStatus: 'queued', + uploadFile: uploadFile, + uploadAbort: async () => { + await multipartUpload.abort() + removeUpload(uploadId) }, + type: 'file', }, + })) + }, + [setUploadsMap, createMultipartUpload, removeUpload] + ) + + const startMultipartUpload = useCallback( + async ({ id, upload }: { id: string; upload: MultipartUpload }) => { + updateStatusToUploading({ + id, + }) + upload.start() + }, + [updateStatusToUploading] + ) + + const checkAndStartUploads = useCallback(() => { + const uploads = Object.values(uploadsMap) + const activeUploads = uploads.filter( + (upload) => upload.uploadStatus === 'uploading' + ).length + const queuedUploads = uploads.filter( + (upload) => upload.uploadStatus === 'queued' + ) + + const availableSlots = maxConcurrentUploads - activeUploads + + // Start uploads if there are available slots and queued uploads + queuedUploads.slice(0, availableSlots).forEach((upload) => { + startMultipartUpload({ + id: upload.id, + upload: upload.upload, }) - if (response.error) { - if (response.error === 'canceled') { - triggerToast('File upload canceled.') - } else { - triggerErrorToast(response.error) - } - removeUpload(path) - } else { - removeUpload(path) - triggerSuccessToast(`Upload complete: ${name}`) - } }) + return uploadsMap + }, [uploadsMap, startMultipartUpload]) + + const uploadFiles = useCallback( + (files: File[]) => { + files.forEach((file) => { + // https://developer.mozilla.org/en-US/docs/Web/API/File + // Documentation does not include `path` but all browsers populate it + // with the relative path of the file. Whereas webkitRelativePath is + // empty string in most browsers. + // Try `path` otherwise fallback to flat file structure. + const relativeUserFilePath = (file['path'] as string) || file.name + const path = join(activeDirectoryPath, relativeUserFilePath) + const name = file.name + const bucketName = getBucketFromPath(path) + const bucket = buckets.data?.find((b) => b.name === bucketName) + if (uploadsMap[path]) { + triggerErrorToast( + `Already uploading file: ${path}, aborting previous upload.` + ) + uploadsMap[path].uploadAbort?.() + } + addUploadToQueue({ + path, + name, + bucket, + uploadFile: file, + }) + }) + setTimeout(() => { + ref.current.checkAndStartUploads() + }, 1_000) + }, + [activeDirectoryPath, addUploadToQueue, buckets.data, uploadsMap] + ) + + ref.current = { + checkAndStartUploads, } - const uploadsList = useMemo( - () => Object.entries(uploadsMap).map((u) => u[1]), + const uploadsList: ObjectUploadData[] = useMemo( + () => Object.entries(uploadsMap).map((u) => u[1] as ObjectUploadData), [uploadsMap] ) return { uploadFiles, + uploadsMap, uploadsList, - uploadCancel, } } diff --git a/apps/renterd/lib/paths.ts b/apps/renterd/lib/paths.ts index e81e2a792..876594c00 100644 --- a/apps/renterd/lib/paths.ts +++ b/apps/renterd/lib/paths.ts @@ -26,18 +26,22 @@ export function getKeyFromPath(path: FullPath): KeyPath { return `/${segsWithoutBucket}` } -// key is the path to the file or directory with a leading slash +// the key parameter needs the leading slash removed and parts encoded +export function getKeyParamFromPath(path: FullPath): KeyPath { + return getKeyFromPath(path) + .slice(1) + .split('/') + .map(encodeURIComponent) + .join('/') +} + export function bucketAndKeyParamsFromPath(path: FullPath): { bucket: string key: KeyPath } { return { bucket: getBucketFromPath(path), - key: getKeyFromPath(path) - .slice(1) - .split('/') - .map(encodeURIComponent) - .join('/'), + key: getKeyParamFromPath(path), } } diff --git a/server/Caddyfile-dev b/server/Caddyfile-dev index 00addb352..2cc4fa211 100644 --- a/server/Caddyfile-dev +++ b/server/Caddyfile-dev @@ -26,7 +26,7 @@ handle @cors { header Access-Control-Allow-Origin * - header Access-Control-Expose-Headers "Link" + header Access-Control-Expose-Headers "Link, ETag" } }