Skip to content

Commit

Permalink
stores: add support for creating a multipart upload, listing them and…
Browse files Browse the repository at this point in the history
… uploading a part
  • Loading branch information
ChrisSchinnerl committed Sep 11, 2023
1 parent c7c455a commit 1f8917a
Show file tree
Hide file tree
Showing 18 changed files with 921 additions and 137 deletions.
53 changes: 53 additions & 0 deletions api/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,55 @@ type GougingSettings struct {
MinMaxEphemeralAccountBalance types.Currency `json:"minMaxEphemeralAccountBalance"`
}

// Types related to multipart uploads.
type (
MultipartCreateRequest struct {
Bucket string `json:"bucket"`
Path string `json:"path"`
}
MultipartCreateResponse struct {
UploadID string `json:"uploadID"`
}
MultipartAbortRequest struct {
}
MultipartAbortResponse struct {
}
MultipartCompleteRequest struct {
}
MultipartCompleteResponse struct {
}
MultipartAddPartRequest struct {
Bucket string `json:"bucket"`
Etag string `json:"etag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
PartialSlabs []object.PartialSlab `json:"partialSlabs"`
PartNumber int `json:"partNumber"`
Slices []object.SlabSlice `json:"slices"`
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
}
MultipartListUploadsRequest struct {
Bucket string
Prefix string
KeyMarker string
UploadIDMarker string
Limit int
}
MultipartListUploadsResponse struct {
Uploads []MultipartListUploadItem `json:"uploads"`
}
MultipartListUploadItem struct {
Path string `json:"objectName"`
UploadID string `json:"uploadID"`
CreatedAt time.Time `json:"createdAt"`
}
MultipartListPartsRequest struct {
}
MultipartListPartsResponse struct {
}
)

type WalletResponse struct {
ScanHeight uint64 `json:"scanHeight"`
Address types.Address `json:"address"`
Expand Down Expand Up @@ -557,3 +606,7 @@ func (rs RedundancySettings) Validate() error {
type AddPartialSlabResponse struct {
Slabs []object.PartialSlab `json:"slabs"`
}

func FormatEtag(etag []byte) string {
return fmt.Sprintf("\"%x\"", etag)
}
8 changes: 8 additions & 0 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ func UploadWithBucket(bucket string) UploadOption {
}
}

// UploadWithDisablePreshardingEncryption disables presharding encryption for
// the upload
func UploadWithDisabledPreshardingEncryption() UploadOption {
return func(v url.Values) {
v.Set("disablepreshardingencryption", "true")
}
}

type DownloadRange struct {
Start int64
Length int64
Expand Down
70 changes: 70 additions & 0 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ type (
RenameObject(ctx context.Context, bucket, from, to string) error
RenameObjects(ctx context.Context, bucket, from, to string) error

AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error)
CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error)
ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)

MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) error
PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)
SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error)
Expand Down Expand Up @@ -1789,6 +1793,65 @@ func New(s Syncer, am *alerts.Manager, hm *webhooks.Manager, cm ChainManager, tp
return b, nil
}

func (b *bus) multipartHandlerCreatePOST(jc jape.Context) {
var req api.MultipartCreateRequest
if jc.Decode(&req) != nil {
return
}
resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path)
if jc.Check("failed to create multipart upload", err) != nil {
return
}
jc.Encode(resp)
}

func (b *bus) multipartHandlerAbortPOST(jc jape.Context) {
var req api.MultipartAbortRequest
if jc.Decode(&req) != nil {
return
}
panic("not implemented")
}

func (b *bus) multipartHandlerCompletePOST(jc jape.Context) {
var req api.MultipartCompleteRequest
if jc.Decode(&req) != nil {
return
}
panic("not implemented")
}

func (b *bus) multipartHandlerUploadPartPUT(jc jape.Context) {
var req api.MultipartAddPartRequest
if jc.Decode(&req) != nil {
return
}
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs, req.Etag, req.UsedContracts)
if jc.Check("failed to upload part", err) != nil {
return
}
}

func (b *bus) multipartHandlerListUploadsPOST(jc jape.Context) {
var req api.MultipartListUploadsRequest
if jc.Decode(&req) != nil {
return
}
resp, err := b.ms.ListMultipartUploads(jc.Request.Context(), req.Bucket, req.Prefix, req.KeyMarker, req.UploadIDMarker, req.Limit)
if jc.Check("failed to list multipart uploads", err) != nil {
return
}
jc.Encode(resp)
}

func (b *bus) multipartHandlerListPartsPOST(jc jape.Context) {
var req api.MultipartListPartsRequest
if jc.Decode(&req) != nil {
return
}
panic("not implemented")
}

// Handler returns an HTTP handler that serves the bus API.
func (b *bus) Handler() http.Handler {
return jape.Mux(tracing.TracedRoutes("bus", map[string]jape.Handler{
Expand Down Expand Up @@ -1907,6 +1970,13 @@ func (b *bus) Handler() http.Handler {
"POST /upload/:id/sector": b.uploadAddSectorHandlerPOST,
"DELETE /upload/:id": b.uploadFinishedHandlerDELETE,

"POST /multipart/create": b.multipartHandlerCreatePOST,
"POST /multipart/abort": b.multipartHandlerAbortPOST,
"POST /multipart/complete": b.multipartHandlerCompletePOST,
"PUT /multipart/part": b.multipartHandlerUploadPartPUT,
"POST /multipart/listuploads": b.multipartHandlerListUploadsPOST,
"POST /multipart/listparts": b.multipartHandlerListPartsPOST,

"GET /webhooks": b.webhookHandlerGet,
"POST /webhooks": b.webhookHandlerPost,
"POST /webhooks/action": b.webhookActionHandlerPost,
Expand Down
34 changes: 34 additions & 0 deletions bus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,40 @@ func (c *Client) renameObjects(ctx context.Context, bucket, from, to, mode strin
return
}

func (c *Client) CreateMultipartUpload(ctx context.Context, bucket, path string) (resp api.MultipartCreateResponse, err error) {
err = c.c.WithContext(ctx).POST("/multipart/create", api.MultipartCreateRequest{
Bucket: bucket,
Path: path,
}, &resp)
return
}

func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error) {
err = c.c.WithContext(ctx).PUT("/multipart/part", api.MultipartAddPartRequest{
Bucket: bucket,
Etag: etag,
Path: path,
ContractSet: contractSet,
UploadID: uploadID,
PartNumber: partNumber,
Slices: slices,
PartialSlabs: partialSlab,
UsedContracts: usedContracts,
})
return
}

func (c *Client) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, err error) {
err = c.c.WithContext(ctx).POST("/multipart/listuploads", api.MultipartListUploadsRequest{
Bucket: bucket,
Prefix: prefix,
KeyMarker: keyMarker,
UploadIDMarker: uploadIDMarker,
Limit: maxUploads,
}, &resp)
return
}

// NewClient returns a client that communicates with a renterd store server
// listening on the specified address.
func NewClient(addr, password string) *Client {
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ module go.sia.tech/renterd
go 1.20

require (
github.com/Mikubill/gofakes3 v0.0.3-0.20230622102024-284c0f988700
github.com/SiaFoundation/gofakes3 v0.0.0-20230911090236-968673d3fd9b
github.com/go-gormigrate/gormigrate/v2 v2.1.0
github.com/google/go-cmp v0.5.9
github.com/gotd/contrib v0.19.0
github.com/klauspost/reedsolomon v1.11.8
github.com/minio/minio-go v6.0.14+incompatible
github.com/minio/minio-go/v7 v7.0.63
github.com/montanaflynn/stats v0.7.1
gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe
Expand All @@ -35,14 +34,14 @@ require (
)

require (
github.com/Mikubill/gofakes3 v0.0.3-0.20230622102024-284c0f988700 // indirect
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect
github.com/aws/aws-sdk-go v1.44.334 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cloudflare/cloudflare-go v0.75.0 // indirect
github.com/dchest/threefish v0.0.0-20120919164726-3ecf4c494abf // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-ini/ini v1.67.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
Expand All @@ -65,7 +64,6 @@ require (
github.com/mattn/go-sqlite3 v1.14.17 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down Expand Up @@ -102,5 +100,6 @@ require (
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
nhooyr.io/websocket v1.8.7 // indirect
)
Loading

0 comments on commit 1f8917a

Please sign in to comment.