From 1f8917aee7f2876f9e3d6731f65cefd1336484ab Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 11 Sep 2023 17:46:51 +0200 Subject: [PATCH] stores: add support for creating a multipart upload, listing them and uploading a part --- api/bus.go | 53 ++++++++++ api/worker.go | 8 ++ bus/bus.go | 70 +++++++++++++ bus/client.go | 34 +++++++ go.mod | 7 +- go.sum | 34 ++++++- internal/testing/s3_test.go | 76 +++++++++++++- object/object.go | 41 ++++++-- object/object_test.go | 2 +- s3/backend.go | 72 ++++++++++++- s3/s3.go | 6 +- stores/metadata.go | 196 +++++++++++++++++++----------------- stores/metadata_test.go | 5 +- stores/migrations.go | 1 + stores/multipart.go | 130 +++++++++++++++++++++--- worker/client.go | 72 +++++++++++++ worker/upload.go | 75 ++++++++++++-- worker/worker.go | 176 +++++++++++++++++++++++++++++++- 18 files changed, 921 insertions(+), 137 deletions(-) diff --git a/api/bus.go b/api/bus.go index 1d70966cf..cc49905cf 100644 --- a/api/bus.go +++ b/api/bus.go @@ -484,6 +484,55 @@ type GougingSettings struct { MinMaxEphemeralAccountBalance types.Currency `json:"minMaxEphemeralAccountBalance"` } +// Types related to multipart uploads. +type ( + MultipartCreateRequest struct { + Bucket string `json:"bucket"` + Path string `json:"path"` + } + MultipartCreateResponse struct { + UploadID string `json:"uploadID"` + } + MultipartAbortRequest struct { + } + MultipartAbortResponse struct { + } + MultipartCompleteRequest struct { + } + MultipartCompleteResponse struct { + } + MultipartAddPartRequest struct { + Bucket string `json:"bucket"` + Etag string `json:"etag"` + Path string `json:"path"` + ContractSet string `json:"contractSet"` + UploadID string `json:"uploadID"` + PartialSlabs []object.PartialSlab `json:"partialSlabs"` + PartNumber int `json:"partNumber"` + Slices []object.SlabSlice `json:"slices"` + UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"` + } + MultipartListUploadsRequest struct { + Bucket string + Prefix string + KeyMarker string + UploadIDMarker string + Limit int + } + MultipartListUploadsResponse struct { + Uploads []MultipartListUploadItem `json:"uploads"` + } + MultipartListUploadItem struct { + Path string `json:"objectName"` + UploadID string `json:"uploadID"` + CreatedAt time.Time `json:"createdAt"` + } + MultipartListPartsRequest struct { + } + MultipartListPartsResponse struct { + } +) + type WalletResponse struct { ScanHeight uint64 `json:"scanHeight"` Address types.Address `json:"address"` @@ -557,3 +606,7 @@ func (rs RedundancySettings) Validate() error { type AddPartialSlabResponse struct { Slabs []object.PartialSlab `json:"slabs"` } + +func FormatEtag(etag []byte) string { + return fmt.Sprintf("\"%x\"", etag) +} diff --git a/api/worker.go b/api/worker.go index 9bbc87014..47b215553 100644 --- a/api/worker.go +++ b/api/worker.go @@ -220,6 +220,14 @@ func UploadWithBucket(bucket string) UploadOption { } } +// UploadWithDisablePreshardingEncryption disables presharding encryption for +// the upload +func UploadWithDisabledPreshardingEncryption() UploadOption { + return func(v url.Values) { + v.Set("disablepreshardingencryption", "true") + } +} + type DownloadRange struct { Start int64 Length int64 diff --git a/bus/bus.go b/bus/bus.go index 06e9d185e..6357b89ea 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -120,6 +120,10 @@ type ( RenameObject(ctx context.Context, bucket, from, to string) error RenameObjects(ctx context.Context, bucket, from, to string) error + AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error) + CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error) + ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) + MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) error PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error) SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error) @@ -1789,6 +1793,65 @@ func New(s Syncer, am *alerts.Manager, hm *webhooks.Manager, cm ChainManager, tp return b, nil } +func (b *bus) multipartHandlerCreatePOST(jc jape.Context) { + var req api.MultipartCreateRequest + if jc.Decode(&req) != nil { + return + } + resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path) + if jc.Check("failed to create multipart upload", err) != nil { + return + } + jc.Encode(resp) +} + +func (b *bus) multipartHandlerAbortPOST(jc jape.Context) { + var req api.MultipartAbortRequest + if jc.Decode(&req) != nil { + return + } + panic("not implemented") +} + +func (b *bus) multipartHandlerCompletePOST(jc jape.Context) { + var req api.MultipartCompleteRequest + if jc.Decode(&req) != nil { + return + } + panic("not implemented") +} + +func (b *bus) multipartHandlerUploadPartPUT(jc jape.Context) { + var req api.MultipartAddPartRequest + if jc.Decode(&req) != nil { + return + } + err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs, req.Etag, req.UsedContracts) + if jc.Check("failed to upload part", err) != nil { + return + } +} + +func (b *bus) multipartHandlerListUploadsPOST(jc jape.Context) { + var req api.MultipartListUploadsRequest + if jc.Decode(&req) != nil { + return + } + resp, err := b.ms.ListMultipartUploads(jc.Request.Context(), req.Bucket, req.Prefix, req.KeyMarker, req.UploadIDMarker, req.Limit) + if jc.Check("failed to list multipart uploads", err) != nil { + return + } + jc.Encode(resp) +} + +func (b *bus) multipartHandlerListPartsPOST(jc jape.Context) { + var req api.MultipartListPartsRequest + if jc.Decode(&req) != nil { + return + } + panic("not implemented") +} + // Handler returns an HTTP handler that serves the bus API. func (b *bus) Handler() http.Handler { return jape.Mux(tracing.TracedRoutes("bus", map[string]jape.Handler{ @@ -1907,6 +1970,13 @@ func (b *bus) Handler() http.Handler { "POST /upload/:id/sector": b.uploadAddSectorHandlerPOST, "DELETE /upload/:id": b.uploadFinishedHandlerDELETE, + "POST /multipart/create": b.multipartHandlerCreatePOST, + "POST /multipart/abort": b.multipartHandlerAbortPOST, + "POST /multipart/complete": b.multipartHandlerCompletePOST, + "PUT /multipart/part": b.multipartHandlerUploadPartPUT, + "POST /multipart/listuploads": b.multipartHandlerListUploadsPOST, + "POST /multipart/listparts": b.multipartHandlerListPartsPOST, + "GET /webhooks": b.webhookHandlerGet, "POST /webhooks": b.webhookHandlerPost, "POST /webhooks/action": b.webhookActionHandlerPost, diff --git a/bus/client.go b/bus/client.go index 8dfbb48d0..0a641ce59 100644 --- a/bus/client.go +++ b/bus/client.go @@ -946,6 +946,40 @@ func (c *Client) renameObjects(ctx context.Context, bucket, from, to, mode strin return } +func (c *Client) CreateMultipartUpload(ctx context.Context, bucket, path string) (resp api.MultipartCreateResponse, err error) { + err = c.c.WithContext(ctx).POST("/multipart/create", api.MultipartCreateRequest{ + Bucket: bucket, + Path: path, + }, &resp) + return +} + +func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error) { + err = c.c.WithContext(ctx).PUT("/multipart/part", api.MultipartAddPartRequest{ + Bucket: bucket, + Etag: etag, + Path: path, + ContractSet: contractSet, + UploadID: uploadID, + PartNumber: partNumber, + Slices: slices, + PartialSlabs: partialSlab, + UsedContracts: usedContracts, + }) + return +} + +func (c *Client) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, err error) { + err = c.c.WithContext(ctx).POST("/multipart/listuploads", api.MultipartListUploadsRequest{ + Bucket: bucket, + Prefix: prefix, + KeyMarker: keyMarker, + UploadIDMarker: uploadIDMarker, + Limit: maxUploads, + }, &resp) + return +} + // NewClient returns a client that communicates with a renterd store server // listening on the specified address. func NewClient(addr, password string) *Client { diff --git a/go.mod b/go.mod index 9dd64f215..b19e64814 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,11 @@ module go.sia.tech/renterd go 1.20 require ( - github.com/Mikubill/gofakes3 v0.0.3-0.20230622102024-284c0f988700 + github.com/SiaFoundation/gofakes3 v0.0.0-20230911090236-968673d3fd9b github.com/go-gormigrate/gormigrate/v2 v2.1.0 github.com/google/go-cmp v0.5.9 github.com/gotd/contrib v0.19.0 github.com/klauspost/reedsolomon v1.11.8 - github.com/minio/minio-go v6.0.14+incompatible github.com/minio/minio-go/v7 v7.0.63 github.com/montanaflynn/stats v0.7.1 gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe @@ -35,6 +34,7 @@ require ( ) require ( + github.com/Mikubill/gofakes3 v0.0.3-0.20230622102024-284c0f988700 // indirect github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect github.com/aws/aws-sdk-go v1.44.334 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect @@ -42,7 +42,6 @@ require ( github.com/dchest/threefish v0.0.0-20120919164726-3ecf4c494abf // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect - github.com/go-ini/ini v1.67.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-sql-driver/mysql v1.7.1 // indirect @@ -65,7 +64,6 @@ require ( github.com/mattn/go-sqlite3 v1.14.17 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/sha256-simd v1.0.1 // indirect - github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -102,5 +100,6 @@ require ( google.golang.org/grpc v1.57.0 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect + lukechampine.com/blake3 v1.2.1 // indirect nhooyr.io/websocket v1.8.7 // indirect ) diff --git a/go.sum b/go.sum index 744f65eec..c9443aeac 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/Mikubill/gofakes3 v0.0.3-0.20230622102024-284c0f988700 h1:r3fp2/Ro+0RtpjNY0/wsbN7vRmCW//dXTOZDQTct25Q= github.com/Mikubill/gofakes3 v0.0.3-0.20230622102024-284c0f988700/go.mod h1:OSXqXEGUe9CmPiwLMMnVrbXonMf4BeLBkBdLufxxiyY= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/SiaFoundation/gofakes3 v0.0.0-20230911090236-968673d3fd9b h1:0UfDJus0ScHuWk683+CgquHtMDRa7htVduNTwSQgV5E= +github.com/SiaFoundation/gofakes3 v0.0.0-20230911090236-968673d3fd9b/go.mod h1:+Csw7MlmrhhBB3xCIp+R9jgQEHEHYMWYFaySNcpWBkM= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmHS9iAKVt9AyzRSqNU1qabPih5BY= @@ -10,6 +12,8 @@ github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSi github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/aws/aws-sdk-go v1.44.124/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= +github.com/aws/aws-sdk-go v1.44.256/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-sdk-go v1.44.334 h1:h2bdbGb//fez6Sv6PaYv868s9liDeoYM6hYsAqTB4MU= github.com/aws/aws-sdk-go v1.44.334/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= @@ -47,8 +51,6 @@ github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/go-gormigrate/gormigrate/v2 v2.1.0 h1:4/1xr9CjOox714EJWbxkF00lrNmbWJToSZzhykKKcKY= github.com/go-gormigrate/gormigrate/v2 v2.1.0/go.mod h1:gpA97koYGyjqaiLDTmLE5W7nyYTmI26AYIf2a/earuo= -github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= -github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= @@ -180,13 +182,10 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/microsoft/go-mssqldb v0.21.0 h1:p2rpHIL7TlSv1QrbXJUAcbyRKnIT0C9rRkH2E4OjLn8= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= -github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o= -github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8= github.com/minio/minio-go/v7 v7.0.63 h1:GbZ2oCvaUdgT5640WJOpyDhhDxvknAJU2/T3yurwcbQ= github.com/minio/minio-go/v7 v7.0.63/go.mod h1:Q6X7Qjb7WMhvG65qKf4gUgA5XaiSox74kR1uAEjxRS4= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= -github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -230,6 +229,7 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= @@ -237,10 +237,13 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= @@ -288,6 +291,7 @@ gitlab.com/NebulousLabs/threadgroup v0.0.0-20200608151952-38921fbef213 h1:owERlK gitlab.com/NebulousLabs/threadgroup v0.0.0-20200608151952-38921fbef213/go.mod h1:vIutAvl7lmJqLVYTCBY5WDdJomP+V74At8LCeEYoH8w= gitlab.com/NebulousLabs/writeaheadlog v0.0.0-20200618142844-c59a90f49130/go.mod h1:SxigdS5Q1ui+OMgGAXt1E/Fg3RB6PvKXMov2O3gvIzs= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8= go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= @@ -339,6 +343,7 @@ golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -347,6 +352,9 @@ golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -358,8 +366,11 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -369,6 +380,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -383,17 +395,21 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210421210424-b80969c67360/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -402,6 +418,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -416,6 +434,9 @@ golang.org/x/tools v0.0.0-20190829051458-42f498d34c4d/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4= golang.org/x/tools v0.12.0 h1:YW6HUoUmYBpwSgyaGaZq1fHjrBjX1rlpZ54T6mu2kss= golang.org/x/tools v0.12.0/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -443,6 +464,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -462,6 +484,8 @@ gorm.io/gorm v1.25.1/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw= gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI= +lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= lukechampine.com/frand v1.4.2 h1:RzFIpOvkMXuPMBb9maa4ND4wjBn71E1Jpf8BzJHMaVw= lukechampine.com/frand v1.4.2/go.mod h1:4S/TM2ZgrKejMcKMbeLjISpJMO+/eZ1zu3vYX9dtj3s= nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= diff --git a/internal/testing/s3_test.go b/internal/testing/s3_test.go index 9480be98c..6bc38478c 100644 --- a/internal/testing/s3_test.go +++ b/internal/testing/s3_test.go @@ -9,7 +9,7 @@ import ( "strings" "testing" - "github.com/Mikubill/gofakes3" + "github.com/SiaFoundation/gofakes3" "github.com/google/go-cmp/cmp" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" @@ -303,3 +303,77 @@ func TestS3List(t *testing.T) { } } } + +func TestS3MultipartUploads(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + cluster, err := newTestCluster(t.TempDir(), newTestLogger()) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := cluster.Shutdown(context.Background()); err != nil { + t.Fatal(err) + } + }() + s3 := cluster.S3 + + // delete default bucket before testing. + if err := cluster.Bus.DeleteBucket(context.Background(), api.DefaultBucketName); err != nil { + t.Fatal(err) + } + + // add hosts + if _, err := cluster.AddHostsBlocking(testRedundancySettings.TotalShards); err != nil { + t.Fatal(err) + } + + // Create bucket. + err = s3.MakeBucket(context.Background(), "multipart", minio.MakeBucketOptions{}) + if err != nil { + t.Fatal(err) + } + + // Create a core client for lower-level operations. + url := s3.EndpointURL() + core, err := minio.NewCore(url.Host+url.Path, &minio.Options{ + Creds: testS3Credentials, + }) + if err != nil { + t.Fatal(err) + } + + // Start a new multipart upload. + uploadID, err := core.NewMultipartUpload(context.Background(), "multipart", "foo", minio.PutObjectOptions{}) + if err != nil { + t.Fatal(err) + } else if uploadID == "" { + t.Fatal("expected non-empty upload ID") + } + + // List uploads + lmu, err := core.ListMultipartUploads(context.Background(), "multipart", "", "", "", "", 0) + if err != nil { + t.Fatal(err) + } else if len(lmu.Uploads) != 1 { + t.Fatal("expected 1 upload") + } else if upload := lmu.Uploads[0]; upload.UploadID != uploadID || upload.Key != "foo" { + t.Fatal("unexpected upload:", upload.UploadID, upload.Key) + } + + // Add a part. + part, err := core.PutObjectPart(context.Background(), "multipart", "foo", uploadID, 1, bytes.NewReader([]byte("hello")), 5, minio.PutObjectPartOptions{}) + if err != nil { + t.Fatal(err) + } else if part.ETag == "" { + t.Fatal("expected non-empty ETag") + } + + // TODO: list parts + + // TODO: complete upload + + // TODO: download object +} diff --git a/object/object.go b/object/object.go index ffb1a2318..e701d1bfc 100644 --- a/object/object.go +++ b/object/object.go @@ -13,11 +13,19 @@ import ( "lukechampine.com/frand" ) +var NoOpKey = EncryptionKey{ + entropy: new([32]byte), +} + // A EncryptionKey can encrypt and decrypt messages. type EncryptionKey struct { entropy *[32]byte `json:"-"` } +func (k EncryptionKey) IsNoopKey() bool { + return bytes.Equal(k.entropy[:], NoOpKey.entropy[:]) +} + // String implements fmt.Stringer. func (k EncryptionKey) String() string { return "key:" + hex.EncodeToString(k.entropy[:]) @@ -39,9 +47,19 @@ func (k *EncryptionKey) UnmarshalText(b []byte) error { return nil } -// Encrypt returns a cipher.StreamReader that encrypts r with k. -func (k EncryptionKey) Encrypt(r io.Reader) cipher.StreamReader { - c, _ := chacha20.NewUnauthenticatedCipher(k.entropy[:], make([]byte, 24)) +// Encrypt returns a cipher.StreamReader that encrypts r with k starting at the +// given offset. +func (k EncryptionKey) Encrypt(r io.Reader, offset uint64) cipher.StreamReader { + if k.IsNoopKey() { + return cipher.StreamReader{S: &noOpStream{}, R: r} + } + nonce64 := offset / (64 * math.MaxUint32) + offset %= 64 * math.MaxUint32 + + nonce := make([]byte, 24) + binary.LittleEndian.PutUint64(nonce[16:], nonce64) + c, _ := chacha20.NewUnauthenticatedCipher(k.entropy[:], nonce) + c.SetCounter(uint32(offset / 64)) rs := &rekeyStream{key: k.entropy[:], c: c} return cipher.StreamReader{S: rs, R: r} } @@ -49,6 +67,9 @@ func (k EncryptionKey) Encrypt(r io.Reader) cipher.StreamReader { // Decrypt returns a cipher.StreamWriter that decrypts w with k, starting at the // specified offset. func (k EncryptionKey) Decrypt(w io.Writer, offset uint64) cipher.StreamWriter { + if k.IsNoopKey() { + return cipher.StreamWriter{S: &noOpStream{}, W: w} + } nonce64 := offset / (64 * math.MaxUint32) offset %= 64 * math.MaxUint32 @@ -78,9 +99,9 @@ type Object struct { } // NewObject returns a new Object with a random key. -func NewObject() Object { +func NewObject(ec EncryptionKey) Object { return Object{ - Key: GenerateEncryptionKey(), + Key: ec, } } @@ -98,8 +119,8 @@ func (o Object) TotalSize() int64 { // Encrypt wraps the given reader with a reader that encrypts the stream using // the object's key. -func (o Object) Encrypt(r io.Reader) cipher.StreamReader { - return o.Key.Encrypt(r) +func (o Object) Encrypt(r io.Reader, offset uint64) cipher.StreamReader { + return o.Key.Encrypt(r, offset) } // SplitSlabs splits a set of slabs into slices comprising objects with the @@ -163,3 +184,9 @@ func (rs *rekeyStream) XORKeyStream(dst, src []byte) { rs.c, _ = chacha20.NewUnauthenticatedCipher(rs.key, nonce) rs.c.XORKeyStream(dst[rem:], src[rem:]) } + +type noOpStream struct{} + +func (noOpStream) XORKeyStream(dst, src []byte) { + copy(dst, src) +} diff --git a/object/object_test.go b/object/object_test.go index c64b46d97..c2e99b8b1 100644 --- a/object/object_test.go +++ b/object/object_test.go @@ -12,7 +12,7 @@ func TestEncryptionOverflow(t *testing.T) { // Create a random key. key := GenerateEncryptionKey() data := frand.Bytes(3 * 64) - sr := key.Encrypt(bytes.NewReader(data)) + sr := key.Encrypt(bytes.NewReader(data), 0) // Check that the streamreader is initialized correctly. rs := sr.S.(*rekeyStream) diff --git a/s3/backend.go b/s3/backend.go index faa85eec2..d907f2f2a 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -13,12 +13,17 @@ import ( "strings" "time" - "github.com/Mikubill/gofakes3" + "github.com/SiaFoundation/gofakes3" "go.sia.tech/renterd/api" "go.uber.org/zap" "lukechampine.com/frand" ) +var ( + _ gofakes3.Backend = (*s3)(nil) + _ gofakes3.MultipartBackend = (*s3)(nil) +) + type s3 struct { b bus w worker @@ -379,3 +384,68 @@ func (s *s3) CopyObject(srcBucket, srcKey, dstBucket, dstKey string, meta map[st LastModified: gofakes3.NewContentTime(time.Unix(0, 0).UTC()), // TODO: don't have that }, nil } + +func (s *s3) CreateMultipartUpload(bucket, object string, meta map[string]string) (gofakes3.UploadID, error) { + resp, err := s.b.CreateMultipartUpload(context.Background(), bucket, object) + if err != nil { + return "", gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) + } + return gofakes3.UploadID(resp.UploadID), nil +} + +func (s *s3) UploadPart(bucket, object string, id gofakes3.UploadID, partNumber int, contentLength int64, input io.Reader) (etag string, err error) { + etag, err = s.w.UploadPart(context.Background(), input, object, string(id), partNumber, api.UploadWithDisabledPreshardingEncryption()) + if err != nil { + return "", gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) + } + return etag, nil +} + +func (s *s3) ListMultipartUploads(bucket string, marker *gofakes3.UploadListMarker, prefix gofakes3.Prefix, limit int64) (*gofakes3.ListMultipartUploadsResult, error) { + prefix.HasPrefix = prefix.Prefix != "" + prefix.HasDelimiter = prefix.Delimiter != "" + if prefix.HasDelimiter && prefix.Delimiter != "/" { + return nil, gofakes3.ErrorMessage(gofakes3.ErrNotImplemented, "delimiter must be '/'") + } else if prefix.HasPrefix { + return nil, gofakes3.ErrorMessage(gofakes3.ErrNotImplemented, "prefix not supported") + } else if marker != nil { + return nil, gofakes3.ErrorMessage(gofakes3.ErrNotImplemented, "marker not supported") + } + resp, err := s.b.ListMultipartUploads(context.Background(), bucket, "", "", "", int(limit)) + if err != nil { + return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) + } + var uploads []gofakes3.ListMultipartUploadItem + for _, upload := range resp.Uploads { + uploads = append(uploads, gofakes3.ListMultipartUploadItem{ + Key: upload.Path, + UploadID: gofakes3.UploadID(upload.UploadID), + Initiated: gofakes3.NewContentTime(upload.CreatedAt), + }) + } + return &gofakes3.ListMultipartUploadsResult{ + Bucket: bucket, + KeyMarker: "", + UploadIDMarker: "", + NextKeyMarker: "", + NextUploadIDMarker: "", + MaxUploads: limit, + Delimiter: prefix.Delimiter, + Prefix: prefix.Prefix, + CommonPrefixes: []gofakes3.CommonPrefix{}, + IsTruncated: false, + Uploads: uploads, + }, nil +} + +func (s *s3) ListParts(bucket, object string, uploadID gofakes3.UploadID, marker int, limit int64) (*gofakes3.ListMultipartUploadPartsResult, error) { + panic("not implemented") +} + +func (s *s3) AbortMultipartUpload(bucket, object string, id gofakes3.UploadID) error { + panic("not implemented") +} + +func (s *s3) CompleteMultipartUpload(bucket, object string, id gofakes3.UploadID, input *gofakes3.CompleteMultipartUploadRequest) (versionID gofakes3.VersionID, etag string, err error) { + panic("not implemented") +} diff --git a/s3/s3.go b/s3/s3.go index e24d406ec..ef1dc4901 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -8,7 +8,7 @@ import ( "net/http" "strings" - "github.com/Mikubill/gofakes3" + "github.com/SiaFoundation/gofakes3" "go.sia.tech/core/types" "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" @@ -35,12 +35,16 @@ type bus interface { Object(ctx context.Context, path string, opts ...api.ObjectsOption) (o api.Object, entries []api.ObjectMetadata, err error) SearchObjects(ctx context.Context, bucket, key string, offset, limit int) (entries []api.ObjectMetadata, err error) + CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error) + ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) + UploadParams(ctx context.Context) (api.UploadParams, error) } type worker interface { UploadObject(ctx context.Context, r io.Reader, path string, opts ...api.UploadOption) (err error) GetObject(ctx context.Context, path, bucket string, opts ...api.DownloadObjectOption) (api.GetObjectResponse, error) + UploadPart(ctx context.Context, r io.Reader, path, uploadID string, partNumber int, opts ...api.UploadOption) (etag string, err error) } func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) { diff --git a/stores/metadata.go b/stores/metadata.go index 7f66998a4..f60a25f4b 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -97,9 +97,8 @@ type ( dbSlice struct { Model - DBObjectID uint `gorm:"index"` - - PartNumber uint64 `gorm:"index"` + DBObjectID *uint `gorm:"index"` + DBMultipartPartID *uint `gorm:"index"` // Slice related fields. DBSlabID uint `gorm:"index"` @@ -1146,8 +1145,8 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath return fmt.Errorf("failed to fetch src slices: %w", err) } for i := range srcSlices { - srcSlices[i].Model = Model{} // clear model - srcSlices[i].DBObjectID = 0 // clear object id + srcSlices[i].Model = Model{} // clear model + srcSlices[i].DBObjectID = nil // clear object id } var bucket dbBucket @@ -1231,92 +1230,9 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet s return fmt.Errorf("failed to fetch used contracts: %w", err) } - for i, ss := range o.Slabs { - // Create Slab if it doesn't exist yet. - slabKey, err := ss.Key.MarshalText() - if err != nil { - return fmt.Errorf("failed to marshal slab key: %w", err) - } - slab := &dbSlab{ - Key: slabKey, - MinShards: ss.MinShards, - TotalShards: uint8(len(ss.Shards)), - } - err = tx.Where(dbSlab{Key: slabKey}). - Assign(dbSlab{ - DBContractSetID: cs.ID, - }). - FirstOrCreate(&slab).Error - if err != nil { - return fmt.Errorf("failed to create slab %v/%v: %w", i+1, len(o.Slabs), err) - } - - // Create Slice. - slice := dbSlice{ - DBSlabID: slab.ID, - DBObjectID: obj.ID, - Offset: ss.Offset, - Length: ss.Length, - } - err = tx.Create(&slice).Error - if err != nil { - return fmt.Errorf("failed to create slice %v/%v: %w", i+1, len(o.Slabs), err) - } - - for j, shard := range ss.Shards { - // Create sector if it doesn't exist yet. - var sector dbSector - err := tx. - Where(dbSector{Root: shard.Root[:]}). - Assign(dbSector{ - DBSlabID: slab.ID, - LatestHost: publicKey(shard.Host), - }). - FirstOrCreate(§or). - Error - if err != nil { - return fmt.Errorf("failed to create sector %v/%v: %w", j+1, len(ss.Shards), err) - } - - // Add contract and host to join tables. - contract, contractFound := contracts[shard.Host] - if contractFound { - err = tx.Model(§or).Association("Contracts").Append(&contract) - if err != nil { - return fmt.Errorf("failed to append to Contracts association: %w", err) - } - } - } - } - - // Handle partial slabs. We create a slice for each partial slab. - partialSlabs := o.PartialSlabs - if len(partialSlabs) == 0 { - return nil - } - - for _, partialSlab := range partialSlabs { - key, err := partialSlab.Key.MarshalText() - if err != nil { - return err - } - var buffer dbBufferedSlab - err = tx.Joins("DBSlab"). - Take(&buffer, "DBSlab.key = ?", key). - Error - if err != nil { - return fmt.Errorf("failed to fetch buffered slab: %w", err) - } - - err = tx.Create(&dbSlice{ - DBObjectID: obj.ID, - DBSlabID: buffer.DBSlab.ID, - Offset: partialSlab.Offset, - Length: partialSlab.Length, - }).Error - if err != nil { - return fmt.Errorf("failed to create slice for partial slab: %w", err) - } + // Create all slices. This also creates any missing slabs or sectors. + if err := s.createSlices(tx, &obj.ID, nil, cs.ID, contracts, o.Slabs, o.PartialSlabs); err != nil { + return fmt.Errorf("failed to create slices: %w", err) } return nil }) @@ -1562,6 +1478,104 @@ func (s *SQLStore) UnhealthySlabs(ctx context.Context, healthCutoff float64, set return slabs, nil } +func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.PublicKey]dbContract, slices []object.SlabSlice, partialSlabs []object.PartialSlab) error { + if (objID == nil && multiPartID == nil) || (objID != nil && multiPartID != nil) { + return fmt.Errorf("either objID or multiPartID must be set") + } + + var dbSlices []dbSlice + for i, ss := range slices { + // Create Slab if it doesn't exist yet. + slabKey, err := ss.Key.MarshalText() + if err != nil { + return fmt.Errorf("failed to marshal slab key: %w", err) + } + slab := &dbSlab{ + Key: slabKey, + MinShards: ss.MinShards, + TotalShards: uint8(len(ss.Shards)), + } + err = tx.Where(dbSlab{Key: slabKey}). + Assign(dbSlab{ + DBContractSetID: contractSetID, + }). + FirstOrCreate(&slab).Error + if err != nil { + return fmt.Errorf("failed to create slab %v/%v: %w", i+1, len(slices), err) + } + + // Create Slice. + slice := dbSlice{ + DBSlabID: slab.ID, + DBObjectID: objID, + DBMultipartPartID: multiPartID, + Offset: ss.Offset, + Length: ss.Length, + } + err = tx.Create(&slice).Error + if err != nil { + return fmt.Errorf("failed to create slice %v/%v: %w", i+1, len(slices), err) + } + dbSlices = append(dbSlices, slice) + + for j, shard := range ss.Shards { + // Create sector if it doesn't exist yet. + var sector dbSector + err := tx. + Where(dbSector{Root: shard.Root[:]}). + Assign(dbSector{ + DBSlabID: slab.ID, + LatestHost: publicKey(shard.Host), + }). + FirstOrCreate(§or). + Error + if err != nil { + return fmt.Errorf("failed to create sector %v/%v: %w", j+1, len(ss.Shards), err) + } + + // Add contract and host to join tables. + contract, contractFound := contracts[shard.Host] + if contractFound { + err = tx.Model(§or).Association("Contracts").Append(&contract) + if err != nil { + return fmt.Errorf("failed to append to Contracts association: %w", err) + } + } + } + } + + // Handle partial slabs. We create a slice for each partial slab. + if len(partialSlabs) == 0 { + return nil + } + + for _, partialSlab := range partialSlabs { + key, err := partialSlab.Key.MarshalText() + if err != nil { + return err + } + var buffer dbBufferedSlab + err = tx.Joins("DBSlab"). + Take(&buffer, "DBSlab.key = ?", key). + Error + if err != nil { + return fmt.Errorf("failed to fetch buffered slab: %w", err) + } + + err = tx.Create(&dbSlice{ + DBObjectID: objID, + DBMultipartPartID: multiPartID, + DBSlabID: buffer.DBSlab.ID, + Offset: partialSlab.Offset, + Length: partialSlab.Length, + }).Error + if err != nil { + return fmt.Errorf("failed to create slice for partial slab: %w", err) + } + } + return nil +} + // object retrieves a raw object from the store. func (s *SQLStore) object(ctx context.Context, txn *gorm.DB, bucket string, path string) (rawObject, error) { // NOTE: we LEFT JOIN here because empty objects are valid and need to be diff --git a/stores/metadata_test.go b/stores/metadata_test.go index fe026a327..48949ef82 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -1021,6 +1021,7 @@ func TestSQLMetadataStore(t *testing.T) { obj.Slabs[i].Model = Model{} } + one := uint(1) expectedObj := dbObject{ DBBucketID: 1, ObjectID: objID, @@ -1028,13 +1029,13 @@ func TestSQLMetadataStore(t *testing.T) { Size: obj1.TotalSize(), Slabs: []dbSlice{ { - DBObjectID: 1, + DBObjectID: &one, DBSlabID: 1, Offset: 10, Length: 100, }, { - DBObjectID: 1, + DBObjectID: &one, DBSlabID: 2, Offset: 20, Length: 200, diff --git a/stores/migrations.go b/stores/migrations.go index fef11ad0e..d64498dc5 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -17,6 +17,7 @@ var ( &dbContract{}, &dbContractSet{}, &dbObject{}, + &dbMultipartUpload{}, &dbBucket{}, &dbBufferedSlab{}, &dbSlab{}, diff --git a/stores/multipart.go b/stores/multipart.go index 665566442..44b3e30d2 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -1,39 +1,141 @@ package stores import ( - "io" - "time" + "context" + "encoding/hex" + "errors" + "fmt" - "github.com/Mikubill/gofakes3" + "github.com/SiaFoundation/gofakes3" + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/object" + "gorm.io/gorm" + "lukechampine.com/frand" ) type ( dbMultipartUpload struct { Model - DBObjectID uint - DBObject dbObject + UploadID string `gorm:"uniqueIndex"` + ObjectID string `gorm:"index"` + Parts []dbMultipartPart `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete parts too + } + + dbMultipartPart struct { + Model + Etag string `gorm:"index"` + PartNumber int `gorm:"index"` + DBMultipartUploadID uint `gorm:"index;NOT NULL"` + Slabs []dbSlice `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete slices too } ) -func CreateMultipartUpload(bucket, object string, meta map[string]string, initiated time.Time) (string, error) { - panic("not implemented") +func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error) { + var uploadID string + err := s.retryTransaction(func(tx *gorm.DB) error { + // Get bucket id. + var bucketID uint + err := tx.Table("(SELECT id from buckets WHERE buckets.name = ?) bucket_id", bucket). + Take(&bucketID).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return fmt.Errorf("bucket %v not found: %w", bucket, api.ErrBucketNotFound) + } else if err != nil { + return fmt.Errorf("failed to fetch bucket id: %w", err) + } + // Create multipart upload + uploadIDEntropy := frand.Entropy256() + uploadID = hex.EncodeToString(uploadIDEntropy[:]) + if err := s.db.Create(&dbMultipartUpload{ + UploadID: uploadID, + ObjectID: path, + }).Error; err != nil { + return fmt.Errorf("failed to create multipart upload: %w", err) + } + return nil + }) + return api.MultipartCreateResponse{ + UploadID: uploadID, + }, err } -func UploadPart(bucket, object string, uploadID string, partNumber int, at time.Time, contentLength int64, input io.Reader) (etag string, err error) { - panic("not implemented") +func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlabs []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error) { + err = s.retryTransaction(func(tx *gorm.DB) error { + // Fetch contract set. + var cs dbContractSet + if err := tx.Take(&cs, "name = ?", contractSet).Error; err != nil { + return fmt.Errorf("contract set %v not found: %w", contractSet, err) + } + // Fetch the used contracts. + contracts, err := fetchUsedContracts(tx, usedContracts) + if err != nil { + return fmt.Errorf("failed to fetch used contracts: %w", err) + } + // Find multipart upload. + var mu dbMultipartUpload + err = tx.Where("upload_id", uploadID). + Take(&mu). + Error + if err != nil { + return fmt.Errorf("failed to fetch multipart upload: %w", err) + } + // Delete a potentially existing part. + err = tx.Model(&dbMultipartPart{}). + Where("db_multipart_upload_id = ? AND part_number = ?", mu.ID, partNumber). + Delete(&dbMultipartPart{}). + Error + if err != nil { + return fmt.Errorf("failed to delete existing part: %w", err) + } + // Create a new part. + part := dbMultipartPart{ + Etag: etag, + PartNumber: partNumber, + DBMultipartUploadID: mu.ID, + } + err = tx.Create(&part).Error + if err != nil { + return fmt.Errorf("failed to create part: %w", err) + } + // Create the slices. + err = s.createSlices(tx, nil, &part.ID, cs.ID, contracts, slices, partialSlabs) + if err != nil { + return fmt.Errorf("failed to create slices: %w", err) + } + return nil + }) + return err } -func ListMultipartUploads(bucket string, marker *gofakes3.UploadListMarker, prefix gofakes3.Prefix, limit int64) (*gofakes3.ListMultipartUploadsResult, error) { - panic("not implemented") +// TODO: f/u with support for 'prefix', 'keyMarker' and 'uploadIDMarker' +func (s *SQLStore) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) { + err := s.retryTransaction(func(tx *gorm.DB) error { + var dbUploads []dbMultipartUpload + err := tx.Limit(int(maxUploads)). + Find(&dbUploads). + Error + if err != nil { + return err + } + for _, upload := range dbUploads { + resp.Uploads = append(resp.Uploads, api.MultipartListUploadItem{ + Path: upload.ObjectID, + UploadID: upload.UploadID, + CreatedAt: upload.CreatedAt.UTC(), + }) + } + return nil + }) + return resp, err } -func ListParts(bucket, object string, uploadID string, marker int, limit int64) (*gofakes3.ListMultipartUploadPartsResult, error) { +func (s *SQLStore) ListParts(bucket, object string, uploadID string, marker int, limit int64) (api.MultipartListPartsResponse, error) { panic("not implemented") } -func AbortMultipartUpload(bucket, object string, uploadID string) error { +func (s *SQLStore) AbortMultipartUpload(bucket, object string, uploadID string) (api.MultipartAbortResponse, error) { panic("not implemented") } -func CompleteMultipartUpload(bucket, object string, uploadID string, input *gofakes3.CompleteMultipartUploadRequest) (versionID gofakes3.VersionID, etag string, err error) { +func (s *SQLStore) CompleteMultipartUpload(bucket, object string, uploadID string, input *gofakes3.CompleteMultipartUploadRequest) (_ api.MultipartCompleteResponse, err error) { panic("not implemented") } diff --git a/worker/client.go b/worker/client.go index c2603e334..190d71d2e 100644 --- a/worker/client.go +++ b/worker/client.go @@ -213,6 +213,40 @@ func (c *Client) UploadObject(ctx context.Context, r io.Reader, path string, opt return } +// UploadPart uploads part of the data for a multipart upload. +func (c *Client) UploadPart(ctx context.Context, r io.Reader, path, uploadID string, partNumber int, opts ...api.UploadOption) (etag string, err error) { + path = strings.TrimPrefix(path, "/") + c.c.Custom("PUT", fmt.Sprintf("/multipart/%s", path), []byte{}, nil) + + values := make(url.Values) + values.Set("uploadid", uploadID) + values.Set("partnumber", fmt.Sprint(partNumber)) + for _, opt := range opts { + opt(values) + } + u, err := url.Parse(fmt.Sprintf("%v/multipart/%v", c.c.BaseURL, path)) + if err != nil { + panic(err) + } + u.RawQuery = values.Encode() + req, err := http.NewRequestWithContext(ctx, "PUT", u.String(), r) + if err != nil { + panic(err) + } + req.SetBasicAuth("", c.c.WithContext(ctx).Password) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer io.Copy(io.Discard, resp.Body) + defer resp.Body.Close() + if resp.StatusCode != 200 { + err, _ := io.ReadAll(resp.Body) + return "", errors.New(string(err)) + } + return resp.Header.Get("ETag"), nil +} + func (c *Client) object(ctx context.Context, bucket, path, prefix string, offset, limit int, opts ...api.DownloadObjectOption) (_ io.ReadCloser, _ http.Header, err error) { values := url.Values{} values.Set("bucket", url.QueryEscape(bucket)) @@ -331,6 +365,44 @@ func (c *Client) DeleteObject(ctx context.Context, path string, batch bool) (err return } +func (c *Client) UploadMultipartPart(ctx context.Context, bucket, path, uploadID string, partNumber int, + data io.Reader, size int64) (err error) { + values := url.Values{} + values.Set("bucket", bucket) + values.Set("uploadID", uploadID) + values.Set("partNumber", fmt.Sprint(partNumber)) + + u, err := url.Parse(fmt.Sprintf("%v/multipart/create/%s", c.c.BaseURL, path)) + if err != nil { + panic(err) + } + u.RawQuery = values.Encode() + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), data) + if err != nil { + panic(err) + } + + // Set headers. + req.Header.Set("Content-Length", fmt.Sprint(size)) + req.Header.Set("Content-Type", "octet/stream") + if c.c.Password != "" { + req.SetBasicAuth("", c.c.Password) + } + + // Send request. + r, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer io.Copy(io.Discard, r.Body) + defer r.Body.Close() + if !(200 <= r.StatusCode && r.StatusCode < 300) { + err, _ := io.ReadAll(r.Body) + return errors.New(string(err)) + } + return nil +} + // Contracts returns all contracts from the worker. These contracts decorate a // bus contract with the contract's latest revision. func (c *Client) Contracts(ctx context.Context, hostTimeout time.Duration) (resp api.ContractsResponse, err error) { diff --git a/worker/upload.go b/worker/upload.go index 674980b33..69951d799 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -19,6 +19,7 @@ import ( "go.sia.tech/renterd/object" "go.sia.tech/renterd/tracing" "go.uber.org/zap" + "lukechampine.com/blake3" "lukechampine.com/frand" ) @@ -33,6 +34,25 @@ var ( errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy") ) +type uploadConfig struct { + ec object.EncryptionKey + encryptionOffset uint64 +} + +type UploadOption func(*uploadConfig) + +func WithCustomKey(ec object.EncryptionKey) UploadOption { + return func(cfg *uploadConfig) { + cfg.ec = ec + } +} + +func WithCustomEncryptionOffset(offset uint64) UploadOption { + return func(cfg *uploadConfig) { + cfg.encryptionOffset = offset + } +} + type ( slabID [8]byte @@ -262,7 +282,31 @@ func (mgr *uploadManager) Stop() { } } -func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool) (_ object.Object, partialSlab []byte, err error) { +type etagger struct { + r io.Reader + h *blake3.Hasher +} + +func newEtagger(r io.Reader) *etagger { + return &etagger{ + r: r, + h: blake3.New(32, nil), + } +} + +func (e *etagger) Read(p []byte) (int, error) { + n, err := e.r.Read(p) + if _, wErr := e.h.Write(p[:n]); wErr != nil { + return 0, wErr + } + return n, err +} + +func (e *etagger) Etag() []byte { + return e.h.Sum(nil) +} + +func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool, opts ...UploadOption) (_ object.Object, partialSlab, etag []byte, err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -274,16 +318,29 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.Redund span.End() }() + // apply options + uc := uploadConfig{ + ec: object.GenerateEncryptionKey(), // random key + encryptionOffset: 0, // from the beginning + } + for _, opt := range opts { + opt(&uc) + } + + // wrap the reader to create an etag + tagger := newEtagger(r) + r = tagger + // create the object - o := object.NewObject() + o := object.NewObject(uc.ec) // create the cipher reader - cr := o.Encrypt(r) + cr := o.Encrypt(r, uc.encryptionOffset) // create the upload u, finishFn, err := mgr.newUpload(ctx, rs.TotalShards, contracts, bh) if err != nil { - return object.Object{}, nil, err + return object.Object{}, nil, nil, err } defer finishFn() @@ -305,9 +362,9 @@ loop: for { select { case <-mgr.stopChan: - return object.Object{}, nil, errors.New("manager was stopped") + return object.Object{}, nil, nil, errors.New("manager was stopped") case <-ctx.Done(): - return object.Object{}, nil, errors.New("upload timed out") + return object.Object{}, nil, nil, errors.New("upload timed out") case nextSlabChan <- struct{}{}: // read next slab's data data := make([]byte, size) @@ -325,7 +382,7 @@ loop: } continue } else if err != nil && err != io.ErrUnexpectedEOF { - return object.Object{}, nil, err + return object.Object{}, nil, nil, err } if uploadPacking && errors.Is(err, io.ErrUnexpectedEOF) { // If uploadPacking is true, we return the partial slab without @@ -341,7 +398,7 @@ loop: slabIndex++ case res := <-respChan: if res.err != nil { - return object.Object{}, nil, res.err + return object.Object{}, nil, nil, res.err } // collect the response and potentially break out of the loop @@ -361,7 +418,7 @@ loop: for _, resp := range responses { o.Slabs = append(o.Slabs, resp.slab) } - return o, partialSlab, nil + return o, partialSlab, tagger.Etag(), nil } func (mgr *uploadManager) launch(req *sectorUploadReq) error { diff --git a/worker/worker.go b/worker/worker.go index 739194fb4..c7f62c96c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -2,6 +2,7 @@ package worker import ( "context" + "encoding/hex" "errors" "fmt" "io" @@ -164,6 +165,8 @@ type Bus interface { AddObject(ctx context.Context, bucket string, path, contractSet string, o object.Object, usedContracts map[types.PublicKey]types.FileContractID) error DeleteObject(ctx context.Context, bucket, path string, batch bool) error + AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlabs []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error) + AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, err error) FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error) Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error) @@ -1131,7 +1134,7 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { } // upload the object - obj, partialSlabData, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking) + obj, partialSlabData, _, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking) if jc.Check("couldn't upload object", err) != nil { return } @@ -1203,6 +1206,175 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { } } +func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { + jc.Custom((*[]byte)(nil), nil) + ctx := jc.Request.Context() + + // fetch the upload parameters + up, err := w.bus.UploadParams(ctx) + if jc.Check("couldn't fetch upload parameters from bus", err) != nil { + return + } + + // decode the contract set from the query string + var contractset string + if jc.DecodeForm("contractset", &contractset) != nil { + return + } else if contractset != "" { + up.ContractSet = contractset + } + + // decode the bucket from the query string + bucket := api.DefaultBucketName + if jc.DecodeForm("bucket", &bucket) != nil { + return + } + + // decode the upload id + var uploadID string + if jc.DecodeForm("uploadid", &uploadID) != nil { + return + } else if uploadID == "" { + jc.Error(errors.New("upload id not specified"), http.StatusBadRequest) + return + } + + var partNumber int + if jc.DecodeForm("partnumber", &partNumber) != nil { + return + } + + // cancel the upload if no contract set is specified + if up.ContractSet == "" { + jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) + return + } + + // cancel the upload if consensus is not synced + if !up.ConsensusState.Synced { + w.logger.Errorf("upload cancelled, err: %v", api.ErrConsensusNotSynced) + jc.Error(api.ErrConsensusNotSynced, http.StatusServiceUnavailable) + return + } + + // allow overriding the redundancy settings + rs := up.RedundancySettings + if jc.DecodeForm("minshards", &rs.MinShards) != nil { + return + } + if jc.DecodeForm("totalshards", &rs.TotalShards) != nil { + return + } + if jc.Check("invalid redundancy settings", rs.Validate()) != nil { + return + } + + var opts []UploadOption + + // make sure only one of the following is set + var disablePreshardingEncryption bool // enabled by default + if jc.DecodeForm("disablepreshardingencryption", &disablePreshardingEncryption) != nil { + return + } + if !disablePreshardingEncryption && jc.Request.FormValue("offset") == "" { + jc.Error(errors.New("if presharding encryption isn't disabled, the offset needs to be set"), http.StatusBadRequest) + return + } + var offset uint64 + if jc.DecodeForm("offset", &offset) != nil { + return + } + if disablePreshardingEncryption { + opts = append(opts, WithCustomKey(object.NoOpKey)) + } else { + opts = append(opts, WithCustomEncryptionOffset(offset)) + } + + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + + // update uploader contracts + contracts, err := w.bus.ContractSetContracts(ctx, up.ContractSet) + if jc.Check("couldn't fetch contracts from bus", err) != nil { + return + } + + // upload the part + obj, partialSlabData, etag, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking, opts...) + if jc.Check("couldn't upload object", err) != nil { + return + } + + // build used contracts map + h2c := make(map[types.PublicKey]types.FileContractID) + for _, c := range contracts { + h2c[c.HostKey] = c.ID + } + used := make(map[types.PublicKey]types.FileContractID) + for _, s := range obj.Slabs { + for _, ss := range s.Shards { + used[ss.Host] = h2c[ss.Host] + } + } + + if len(partialSlabData) > 0 { + partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet) + if jc.Check("couldn't add partial slabs to bus", err) != nil { + return + } + obj.PartialSlabs = partialSlabs + } + + // persist the part + if jc.Check("couldn't add part", w.bus.AddMultipartPart(ctx, bucket, jc.PathParam("path"), up.ContractSet, uploadID, partNumber, obj.Slabs, obj.PartialSlabs, hex.EncodeToString(etag), used)) != nil { + return + } + + // set etag in header response. + jc.ResponseWriter.Header().Set("ETag", api.FormatEtag(etag)) + + // if partial uploads are not enabled we are done. + if !up.UploadPacking { + return + } + + // if partial uploads are enabled, check whether we have a full slab now + packedSlabs, err := w.bus.PackedSlabsForUpload(jc.Request.Context(), 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet, 2) + if jc.Check("couldn't fetch packed slabs from bus", err) != nil { + return + } + + for _, ps := range packedSlabs { + // upload packed slab. + shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) + sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) + if jc.Check("couldn't upload packed slab", err) != nil { + return + } + + // build used contracts map + h2c := make(map[types.PublicKey]types.FileContractID) + for _, c := range contracts { + h2c[c.HostKey] = c.ID + } + used := make(map[types.PublicKey]types.FileContractID) + for _, s := range sectors { + used[s.Host] = h2c[s.Host] + } + + // mark packed slab as uploaded. + err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{ + { + BufferID: ps.BufferID, + Shards: sectors, + }, + }, used) + if jc.Check("couldn't mark packed slabs uploaded", err) != nil { + return + } + } +} + func encryptPartialSlab(data []byte, key object.EncryptionKey, minShards, totalShards uint8) [][]byte { slab := object.Slab{ Key: key, @@ -1355,6 +1527,8 @@ func (w *worker) Handler() http.Handler { "PUT /objects/*path": w.objectsHandlerPUT, "DELETE /objects/*path": w.objectsHandlerDELETE, + "PUT /multipart/*path": w.multipartUploadHandlerPUT, + "GET /state": w.stateHandlerGET, })) }