From a3360da727bb8f57b254162a30aafd2060a6ca3b Mon Sep 17 00:00:00 2001 From: Peter-Jan Brone Date: Fri, 13 Oct 2023 16:37:26 +0200 Subject: [PATCH] Refactor Worker Client (#653) * worker: refactor client * worker: remove ranged responsewriter, add ObjectEntriesOptions * fix: TestMultipartUploads * testing: fix TestMultipartUploads * api: remove json tags * api: rename to ApplyHeaders and ApplyValues --- api/bus.go | 26 +- api/object.go | 114 +++++++- api/worker.go | 412 ++++++++++++---------------- internal/testing/cluster_test.go | 99 +++---- internal/testing/gouging_test.go | 11 +- internal/testing/migrations_test.go | 2 +- internal/testing/pruning_test.go | 2 +- internal/testing/uploads_test.go | 3 +- s3/backend.go | 23 +- s3/s3.go | 6 +- worker/{ => client}/client.go | 385 +++++++++----------------- worker/client/rhp.go | 138 ++++++++++ worker/worker.go | 42 +-- 13 files changed, 647 insertions(+), 616 deletions(-) rename worker/{ => client}/client.go (51%) create mode 100644 worker/client/rhp.go diff --git a/api/bus.go b/api/bus.go index cf05ded31..44725db19 100644 --- a/api/bus.go +++ b/api/bus.go @@ -382,20 +382,20 @@ type GougingParams struct { // Option types. type ( GetHostsOptions struct { - Offset int `json:"offset"` - Limit int `json:"limit"` + Offset int + Limit int } HostsForScanningOptions struct { - MaxLastScan time.Time `json:"maxLastScan"` - Limit int `json:"limit"` - Offset int `json:"offset"` + MaxLastScan time.Time + Limit int + Offset int } SearchHostOptions struct { - AddressContains string `json:"addressContains"` - FilterMode string `json:"filterMode"` - KeyIn []types.PublicKey `json:"keyIn"` - Limit int `json:"limit"` - Offset int `json:"offset"` + AddressContains string + FilterMode string + KeyIn []types.PublicKey + Limit int + Offset int } ) @@ -430,8 +430,8 @@ func (opts HostsForScanningOptions) Apply(values url.Values) { // Types related to multipart uploads. type ( CreateMultipartOptions struct { - Key object.EncryptionKey `json:"key"` - MimeType string `json:"mimeType"` + Key object.EncryptionKey + MimeType string } MultipartCreateRequest struct { Bucket string `json:"bucket"` @@ -540,7 +540,7 @@ type ( } CreateBucketOptions struct { - Policy BucketPolicy `json:"policy"` + Policy BucketPolicy } ) diff --git a/api/object.go b/api/object.go index e1f9fb718..a599fd197 100644 --- a/api/object.go +++ b/api/object.go @@ -128,34 +128,118 @@ func (o ObjectMetadata) ContentType() string { type ( AddObjectOptions struct { - MimeType string `json:"mimeType"` - ETag string `json:"eTag"` + MimeType string + ETag string } + CopyObjectOptions struct { - MimeType string `json:"mimeType"` + MimeType string } + DeleteObjectOptions struct { - Batch bool `json:"batch"` + Batch bool + } + + DownloadObjectOptions struct { + Prefix string + Offset int + Limit int + Range DownloadRange } + + ObjectEntriesOptions struct { + Prefix string + Offset int + Limit int + } + GetObjectOptions struct { - Prefix string `json:"prefix"` - Offset int `json:"offset"` - Limit int `json:"limit"` - IgnoreDelim bool `json:"ignoreDelim"` - Marker string `json:"marker"` + Prefix string + Offset int + Limit int + IgnoreDelim bool + Marker string } + ListObjectOptions struct { - Prefix string `json:"prefix"` - Marker string `json:"marker"` - Limit int `json:"limit"` + Prefix string + Marker string + Limit int } + SearchObjectOptions struct { - Key string `json:"key"` - Offset int `json:"offset"` - Limit int `json:"limit"` + Key string + Offset int + Limit int + } + + UploadObjectOptions struct { + Offset int + MinShards int + TotalShards int + ContractSet string + MimeType string + DisablePreshardingEncryption bool + } + + UploadMultipartUploadPartOptions struct { + DisablePreshardingEncryption bool + EncryptionOffset int } ) +func (opts UploadObjectOptions) Apply(values url.Values) { + if opts.Offset != 0 { + values.Set("offset", fmt.Sprint(opts.Offset)) + } + if opts.MinShards != 0 { + values.Set("minshards", fmt.Sprint(opts.MinShards)) + } + if opts.TotalShards != 0 { + values.Set("totalshards", fmt.Sprint(opts.TotalShards)) + } + if opts.ContractSet != "" { + values.Set("contractset", opts.ContractSet) + } + if opts.MimeType != "" { + values.Set("mimetype", opts.MimeType) + } + if opts.DisablePreshardingEncryption { + values.Set("disablepreshardingencryption", "true") + } +} + +func (opts UploadMultipartUploadPartOptions) Apply(values url.Values) { + if opts.DisablePreshardingEncryption { + values.Set("disablepreshardingencryption", "true") + } + if !opts.DisablePreshardingEncryption || opts.EncryptionOffset != 0 { + values.Set("offset", fmt.Sprint(opts.EncryptionOffset)) + } +} + +func (opts DownloadObjectOptions) ApplyValues(values url.Values) { + if opts.Prefix != "" { + values.Set("prefix", opts.Prefix) + } + if opts.Offset != 0 { + values.Set("offset", fmt.Sprint(opts.Offset)) + } + if opts.Limit != 0 { + values.Set("limit", fmt.Sprint(opts.Limit)) + } +} + +func (opts DownloadObjectOptions) ApplyHeaders(h http.Header) { + if opts.Range != (DownloadRange{}) { + if opts.Range.Length == -1 { + h.Set("Range", fmt.Sprintf("bytes=%v-", opts.Range.Offset)) + } else { + h.Set("Range", fmt.Sprintf("bytes=%v-%v", opts.Range.Offset, opts.Range.Offset+opts.Range.Length-1)) + } + } +} + func (opts DeleteObjectOptions) Apply(values url.Values) { if opts.Batch { values.Set("batch", "true") diff --git a/api/worker.go b/api/worker.go index ff777ecad..11c9ca59d 100644 --- a/api/worker.go +++ b/api/worker.go @@ -4,8 +4,6 @@ import ( "errors" "fmt" "io" - "net/http" - "net/url" "strconv" "strings" "time" @@ -15,274 +13,222 @@ import ( "go.sia.tech/core/types" ) -// ErrConsensusNotSynced is returned by the worker API by endpoints that rely on -// consensus and the consensus is not synced. -var ErrConsensusNotSynced = errors.New("consensus is not synced") +var ( + // ErrConsensusNotSynced is returned by the worker API by endpoints that rely on + // consensus and the consensus is not synced. + ErrConsensusNotSynced = errors.New("consensus is not synced") -// ErrContractSetNotSpecified is returned by the worker API by endpoints that -// need a contract set to be able to upload data. -var ErrContractSetNotSpecified = errors.New("contract set is not specified") - -type AccountsLockHandlerRequest struct { - HostKey types.PublicKey `json:"hostKey"` - Exclusive bool `json:"exclusive"` - Duration DurationMS `json:"duration"` -} - -type AccountsLockHandlerResponse struct { - Account Account `json:"account"` - LockID uint64 `json:"lockID"` -} - -type AccountsUnlockHandlerRequest struct { - LockID uint64 `json:"lockID"` -} - -// ContractsResponse is the response type for the /rhp/contracts endpoint. -type ContractsResponse struct { - Contracts []Contract `json:"contracts"` - Error string `json:"error,omitempty"` -} - -// MigrateSlabResponse is the response type for the /slab/migrate endpoint. -type MigrateSlabResponse struct { - NumShardsMigrated int `json:"numShardsMigrated"` -} - -// RHPScanRequest is the request type for the /rhp/scan endpoint. -type RHPScanRequest struct { - HostKey types.PublicKey `json:"hostKey"` - HostIP string `json:"hostIP"` - Timeout DurationMS `json:"timeout"` -} + // ErrContractSetNotSpecified is returned by the worker API by endpoints that + // need a contract set to be able to upload data. + ErrContractSetNotSpecified = errors.New("contract set is not specified") +) -// RHPPruneContractRequest is the request type for the /rhp/contract/:id/prune -// endpoint. -type RHPPruneContractRequest struct { - Timeout DurationMS `json:"timeout"` -} +type ( + // AccountsLockHandlerRequest is the request type for the /accounts/:id/lock + // endpoint. + AccountsLockHandlerRequest struct { + HostKey types.PublicKey `json:"hostKey"` + Exclusive bool `json:"exclusive"` + Duration DurationMS `json:"duration"` + } -// RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune -// endpoint. -type RHPPruneContractResponse struct { - Pruned uint64 `json:"pruned"` - Remaining uint64 `json:"remaining"` - Error error `json:"error,omitempty"` -} + // AccountsLockHandlerResponse is the response type for the + // /accounts/:id/lock + AccountsLockHandlerResponse struct { + Account Account `json:"account"` + LockID uint64 `json:"lockID"` + } -// RHPPriceTableRequest is the request type for the /rhp/pricetable endpoint. -type RHPPriceTableRequest struct { - HostKey types.PublicKey `json:"hostKey"` - SiamuxAddr string `json:"siamuxAddr"` - Timeout DurationMS `json:"timeout"` -} + // AccountsUnlockHandlerRequest is the request type for the + // /accounts/:id/unlock + AccountsUnlockHandlerRequest struct { + LockID uint64 `json:"lockID"` + } -// RHPScanResponse is the response type for the /rhp/scan endpoint. -type RHPScanResponse struct { - Ping DurationMS `json:"ping"` - ScanError string `json:"scanError,omitempty"` - Settings rhpv2.HostSettings `json:"settings,omitempty"` - PriceTable rhpv3.HostPriceTable `json:"priceTable,omitempty"` -} + // ContractsResponse is the response type for the /rhp/contracts endpoint. + ContractsResponse struct { + Contracts []Contract `json:"contracts"` + Error string `json:"error,omitempty"` + } -// RHPFormRequest is the request type for the /rhp/form endpoint. -type RHPFormRequest struct { - EndHeight uint64 `json:"endHeight"` - HostCollateral types.Currency `json:"hostCollateral"` - HostKey types.PublicKey `json:"hostKey"` - HostIP string `json:"hostIP"` - RenterFunds types.Currency `json:"renterFunds"` - RenterAddress types.Address `json:"renterAddress"` -} + // MigrateSlabResponse is the response type for the /slab/migrate endpoint. + MigrateSlabResponse struct { + NumShardsMigrated int `json:"numShardsMigrated"` + } -// RHPFormResponse is the response type for the /rhp/form endpoint. -type RHPFormResponse struct { - ContractID types.FileContractID `json:"contractID"` - Contract rhpv2.ContractRevision `json:"contract"` - TransactionSet []types.Transaction `json:"transactionSet"` -} + // RHPFormRequest is the request type for the /rhp/form endpoint. + RHPFormRequest struct { + EndHeight uint64 `json:"endHeight"` + HostCollateral types.Currency `json:"hostCollateral"` + HostKey types.PublicKey `json:"hostKey"` + HostIP string `json:"hostIP"` + RenterFunds types.Currency `json:"renterFunds"` + RenterAddress types.Address `json:"renterAddress"` + } -// RHPRenewRequest is the request type for the /rhp/renew endpoint. -type RHPRenewRequest struct { - ContractID types.FileContractID `json:"contractID"` - EndHeight uint64 `json:"endHeight"` - HostAddress types.Address `json:"hostAddress"` - HostKey types.PublicKey `json:"hostKey"` - SiamuxAddr string `json:"siamuxAddr"` - NewCollateral types.Currency `json:"newCollateral"` - RenterAddress types.Address `json:"renterAddress"` - RenterFunds types.Currency `json:"renterFunds"` - WindowSize uint64 `json:"windowSize"` -} + // RHPFormResponse is the response type for the /rhp/form endpoint. + RHPFormResponse struct { + ContractID types.FileContractID `json:"contractID"` + Contract rhpv2.ContractRevision `json:"contract"` + TransactionSet []types.Transaction `json:"transactionSet"` + } -// RHPRenewResponse is the response type for the /rhp/renew endpoint. -type RHPRenewResponse struct { - Error string `json:"error"` - ContractID types.FileContractID `json:"contractID"` - Contract rhpv2.ContractRevision `json:"contract"` - TransactionSet []types.Transaction `json:"transactionSet"` -} + // RHPFundRequest is the request type for the /rhp/fund endpoint. + RHPFundRequest struct { + ContractID types.FileContractID `json:"contractID"` + HostKey types.PublicKey `json:"hostKey"` + SiamuxAddr string `json:"siamuxAddr"` + Balance types.Currency `json:"balance"` + } -// RHPFundRequest is the request type for the /rhp/fund endpoint. -type RHPFundRequest struct { - ContractID types.FileContractID `json:"contractID"` - HostKey types.PublicKey `json:"hostKey"` - SiamuxAddr string `json:"siamuxAddr"` - Balance types.Currency `json:"balance"` -} + // RHPPruneContractRequest is the request type for the /rhp/contract/:id/prune + // endpoint. + RHPPruneContractRequest struct { + Timeout DurationMS `json:"timeout"` + } -// RHPSyncRequest is the request type for the /rhp/sync endpoint. -type RHPSyncRequest struct { - ContractID types.FileContractID `json:"contractID"` - HostKey types.PublicKey `json:"hostKey"` - SiamuxAddr string `json:"siamuxAddr"` -} + // RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune + // endpoint. + RHPPruneContractResponse struct { + Pruned uint64 `json:"pruned"` + Remaining uint64 `json:"remaining"` + Error error `json:"error,omitempty"` + } -// RHPPreparePaymentRequest is the request type for the /rhp/prepare/payment -// endpoint. -type RHPPreparePaymentRequest struct { - Account rhpv3.Account `json:"account"` - Amount types.Currency `json:"amount"` - Expiry uint64 `json:"expiry"` - AccountKey types.PrivateKey `json:"accountKey"` -} + // RHPPriceTableRequest is the request type for the /rhp/pricetable endpoint. + RHPPriceTableRequest struct { + HostKey types.PublicKey `json:"hostKey"` + SiamuxAddr string `json:"siamuxAddr"` + Timeout DurationMS `json:"timeout"` + } -// RHPRegistryReadRequest is the request type for the /rhp/registry/read -// endpoint. -type RHPRegistryReadRequest struct { - HostKey types.PublicKey `json:"hostKey"` - SiamuxAddr string `json:"siamuxAddr"` - RegistryKey rhpv3.RegistryKey `json:"registryKey"` - Payment rhpv3.PayByEphemeralAccountRequest `json:"payment"` -} + // RHPRenewRequest is the request type for the /rhp/renew endpoint. + RHPRenewRequest struct { + ContractID types.FileContractID `json:"contractID"` + EndHeight uint64 `json:"endHeight"` + HostAddress types.Address `json:"hostAddress"` + HostKey types.PublicKey `json:"hostKey"` + SiamuxAddr string `json:"siamuxAddr"` + NewCollateral types.Currency `json:"newCollateral"` + RenterAddress types.Address `json:"renterAddress"` + RenterFunds types.Currency `json:"renterFunds"` + WindowSize uint64 `json:"windowSize"` + } -// RHPRegistryUpdateRequest is the request type for the /rhp/registry/update -// endpoint. -type RHPRegistryUpdateRequest struct { - HostKey types.PublicKey `json:"hostKey"` - SiamuxAddr string `json:"siamuxAddr"` - RegistryKey rhpv3.RegistryKey `json:"registryKey"` - RegistryValue rhpv3.RegistryValue `json:"registryValue"` -} + // RHPRenewResponse is the response type for the /rhp/renew endpoint. + RHPRenewResponse struct { + Error string `json:"error"` + ContractID types.FileContractID `json:"contractID"` + Contract rhpv2.ContractRevision `json:"contract"` + TransactionSet []types.Transaction `json:"transactionSet"` + } -// DownloadStatsResponse is the response type for the /stats/downloads endpoint. -type DownloadStatsResponse struct { - AvgDownloadSpeedMBPS float64 `json:"avgDownloadSpeedMBPS"` - AvgOverdrivePct float64 `json:"avgOverdrivePct"` - HealthyDownloaders uint64 `json:"healthyDownloaders"` - NumDownloaders uint64 `json:"numDownloaders"` - DownloadersStats []DownloaderStats `json:"downloadersStats"` -} + // RHPScanRequest is the request type for the /rhp/scan endpoint. + RHPScanRequest struct { + HostKey types.PublicKey `json:"hostKey"` + HostIP string `json:"hostIP"` + Timeout DurationMS `json:"timeout"` + } -type DownloaderStats struct { - AvgSectorDownloadSpeedMBPS float64 `json:"avgSectorDownloadSpeedMBPS"` - HostKey types.PublicKey `json:"hostKey"` - NumDownloads uint64 `json:"numDownloads"` -} + // RHPScanResponse is the response type for the /rhp/scan endpoint. + RHPScanResponse struct { + Ping DurationMS `json:"ping"` + ScanError string `json:"scanError,omitempty"` + Settings rhpv2.HostSettings `json:"settings,omitempty"` + PriceTable rhpv3.HostPriceTable `json:"priceTable,omitempty"` + } -// UploadStatsResponse is the response type for the /stats/uploads endpoint. -type UploadStatsResponse struct { - AvgSlabUploadSpeedMBPS float64 `json:"avgSlabUploadSpeedMBPS"` - AvgOverdrivePct float64 `json:"avgOverdrivePct"` - HealthyUploaders uint64 `json:"healthyUploaders"` - NumUploaders uint64 `json:"numUploaders"` - UploadersStats []UploaderStats `json:"uploadersStats"` -} + // RHPSyncRequest is the request type for the /rhp/sync endpoint. + RHPSyncRequest struct { + ContractID types.FileContractID `json:"contractID"` + HostKey types.PublicKey `json:"hostKey"` + SiamuxAddr string `json:"siamuxAddr"` + } -type UploaderStats struct { - HostKey types.PublicKey `json:"hostKey"` - AvgSectorUploadSpeedMBPS float64 `json:"avgSectorUploadSpeedMBPS"` -} + // RHPPreparePaymentRequest is the request type for the /rhp/prepare/payment + // endpoint. + RHPPreparePaymentRequest struct { + Account rhpv3.Account `json:"account"` + Amount types.Currency `json:"amount"` + Expiry uint64 `json:"expiry"` + AccountKey types.PrivateKey `json:"accountKey"` + } -// WorkerStateResponse is the response type for the /worker/state endpoint. -type WorkerStateResponse struct { - ID string `json:"id"` - StartTime time.Time `json:"startTime"` - BuildState -} + // RHPRegistryReadRequest is the request type for the /rhp/registry/read + // endpoint. + RHPRegistryReadRequest struct { + HostKey types.PublicKey `json:"hostKey"` + SiamuxAddr string `json:"siamuxAddr"` + RegistryKey rhpv3.RegistryKey `json:"registryKey"` + Payment rhpv3.PayByEphemeralAccountRequest `json:"payment"` + } -// An UploadOption overrides an option on the upload and migrate endpoints in -// the worker. -type UploadOption func(url.Values) + // RHPRegistryUpdateRequest is the request type for the /rhp/registry/update + // endpoint. + RHPRegistryUpdateRequest struct { + HostKey types.PublicKey `json:"hostKey"` + SiamuxAddr string `json:"siamuxAddr"` + RegistryKey rhpv3.RegistryKey `json:"registryKey"` + RegistryValue rhpv3.RegistryValue `json:"registryValue"` + } -// UploadWithRedundancy sets the min and total shards that should be used for an -// upload -func UploadWithRedundancy(minShards, totalShards int) UploadOption { - return func(v url.Values) { - v.Set("minshards", strconv.Itoa(minShards)) - v.Set("totalshards", strconv.Itoa(totalShards)) + // DownloadStatsResponse is the response type for the /stats/downloads endpoint. + DownloadStatsResponse struct { + AvgDownloadSpeedMBPS float64 `json:"avgDownloadSpeedMBPS"` + AvgOverdrivePct float64 `json:"avgOverdrivePct"` + HealthyDownloaders uint64 `json:"healthyDownloaders"` + NumDownloaders uint64 `json:"numDownloaders"` + DownloadersStats []DownloaderStats `json:"downloadersStats"` + } + DownloaderStats struct { + AvgSectorDownloadSpeedMBPS float64 `json:"avgSectorDownloadSpeedMBPS"` + HostKey types.PublicKey `json:"hostKey"` + NumDownloads uint64 `json:"numDownloads"` } -} -// UploadWithContractSet sets the contract set that should be used for an upload -func UploadWithContractSet(set string) UploadOption { - return func(v url.Values) { - v.Set("contractset", set) + // UploadStatsResponse is the response type for the /stats/uploads endpoint. + UploadStatsResponse struct { + AvgSlabUploadSpeedMBPS float64 `json:"avgSlabUploadSpeedMBPS"` + AvgOverdrivePct float64 `json:"avgOverdrivePct"` + HealthyUploaders uint64 `json:"healthyUploaders"` + NumUploaders uint64 `json:"numUploaders"` + UploadersStats []UploaderStats `json:"uploadersStats"` + } + UploaderStats struct { + HostKey types.PublicKey `json:"hostKey"` + AvgSectorUploadSpeedMBPS float64 `json:"avgSectorUploadSpeedMBPS"` } -} -// UploadWithBucket sets the bucket that should be used for an upload -func UploadWithBucket(bucket string) UploadOption { - return func(v url.Values) { - v.Set("bucket", bucket) + // WorkerStateResponse is the response type for the /worker/state endpoint. + WorkerStateResponse struct { + ID string `json:"id"` + StartTime time.Time `json:"startTime"` + BuildState } -} -// UploadWithDisablePreshardingEncryption disables presharding encryption for -// the upload -func UploadWithDisabledPreshardingEncryption() UploadOption { - return func(v url.Values) { - v.Set("disablepreshardingencryption", "true") + UploadObjectResponse struct { + ETag string `json:"etag"` } -} -// UploadWithMimeType sets the mime type that should be used for the upload -func UploadWithMimeType(mimeType string) UploadOption { - return func(v url.Values) { - v.Set("mimetype", mimeType) + UploadMultipartUploadPartResponse struct { + ETag string `json:"etag"` } -} -func UploadWithEncryptionOffset(offset int64) UploadOption { - return func(v url.Values) { - v.Set("offset", fmt.Sprint(offset)) + GetObjectResponse struct { + Content io.ReadCloser `json:"content"` + ContentType string `json:"contentType"` + ModTime time.Time `json:"modTime"` + Range *DownloadRange `json:"range,omitempty"` + Size int64 `json:"size"` } -} +) type DownloadRange struct { - Start int64 + Offset int64 Length int64 Size int64 } -type UploadObjectResponse struct { - ETag string `json:"etag"` -} - -type UploadMultipartUploadPartResponse struct { - ETag string `json:"etag"` -} - -type GetObjectResponse struct { - Content io.ReadCloser `json:"content"` - ContentType string `json:"contentType"` - ModTime time.Time `json:"modTime"` - Range *DownloadRange `json:"range,omitempty"` - Size int64 `json:"size"` -} - -type DownloadObjectOption func(http.Header) - -func DownloadWithRange(offset, length int64) DownloadObjectOption { - return func(h http.Header) { - if length == -1 { - h.Set("Range", fmt.Sprintf("bytes=%v-", offset)) - } else { - h.Set("Range", fmt.Sprintf("bytes=%v-%v", offset, offset+length-1)) - } - } -} - func ParseDownloadRange(contentRange string) (DownloadRange, error) { parts := strings.Split(contentRange, " ") if len(parts) != 2 || parts[0] != "bytes" { @@ -310,7 +256,7 @@ func ParseDownloadRange(contentRange string) (DownloadRange, error) { return DownloadRange{}, err } return DownloadRange{ - Start: start, + Offset: start, Length: end - start + 1, Size: size, }, nil diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index 06316aa6b..4603027cf 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -69,7 +69,7 @@ func TestNewTestCluster(t *testing.T) { tt.OK(err) // Try talking to the worker and request the object. - err = w.DeleteObject(context.Background(), "foo", false) + err = w.DeleteObject(context.Background(), api.DefaultBucketName, "foo", api.DeleteObjectOptions{}) tt.OK(err) // See if autopilot is running by triggering the loop. @@ -305,11 +305,11 @@ func TestObjectEntries(t *testing.T) { for _, upload := range uploads { if upload.size == 0 { - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(nil), upload.path)) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(nil), api.DefaultBucketName, upload.path, api.UploadObjectOptions{})) } else { data := make([]byte, upload.size) frand.Read(data) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), upload.path)) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, upload.path, api.UploadObjectOptions{})) } } @@ -380,7 +380,7 @@ func TestObjectEntries(t *testing.T) { } // use the worker client - got, err := w.ObjectEntries(context.Background(), api.DefaultBucketName, test.path, test.prefix, 0, -1) + got, err := w.ObjectEntries(context.Background(), api.DefaultBucketName, test.path, api.ObjectEntriesOptions{Prefix: test.prefix}) if err != nil { t.Fatal(err) } @@ -391,7 +391,7 @@ func TestObjectEntries(t *testing.T) { } for _, entry := range got { if !strings.HasSuffix(entry.Name, "/") { - if err := w.DownloadObject(context.Background(), io.Discard, entry.Name); err != nil { + if err := w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, entry.Name, api.DownloadObjectOptions{}); err != nil { t.Fatal(err) } } @@ -400,11 +400,11 @@ func TestObjectEntries(t *testing.T) { // delete all uploads for _, upload := range uploads { - tt.OK(w.DeleteObject(context.Background(), upload.path, false)) + tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, upload.path, api.DeleteObjectOptions{})) } // assert root dir is empty - if entries, err := w.ObjectEntries(context.Background(), api.DefaultBucketName, "/", "", 0, -1); err != nil { + if entries, err := w.ObjectEntries(context.Background(), api.DefaultBucketName, "/", api.ObjectEntriesOptions{}); err != nil { t.Fatal(err) } else if len(entries) != 0 { t.Fatal("there should be no entries left", entries) @@ -435,7 +435,7 @@ func TestObjectsRename(t *testing.T) { "/foo/baz/quuz", } for _, path := range uploads { - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(nil), path)) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(nil), api.DefaultBucketName, path, api.UploadObjectOptions{})) } // rename @@ -454,7 +454,7 @@ func TestObjectsRename(t *testing.T) { "/quuz", } { buf := bytes.NewBuffer(nil) - if err := w.DownloadObject(context.Background(), buf, path); err != nil { + if err := w.DownloadObject(context.Background(), buf, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err != nil { t.Fatal(err) } } @@ -477,11 +477,11 @@ func TestUploadDownloadEmpty(t *testing.T) { tt := cluster.tt // upload an empty file - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(nil), "empty")) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(nil), api.DefaultBucketName, "empty", api.UploadObjectOptions{})) // download the empty file var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, "empty")) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, "empty", api.DownloadObjectOptions{})) // assert it's empty if len(buffer.Bytes()) != 0 { @@ -515,12 +515,12 @@ func TestUploadDownloadBasic(t *testing.T) { tt.OKAll(frand.Read(data)) // upload the data - name := fmt.Sprintf("data_%v", len(data)) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), name)) + path := fmt.Sprintf("data_%v", len(data)) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{})) // download data var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, name)) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{})) // assert it matches if !bytes.Equal(data, buffer.Bytes()) { @@ -531,7 +531,7 @@ func TestUploadDownloadBasic(t *testing.T) { for i := int64(0); i < 4; i++ { offset := i * 32 var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, name, api.DownloadWithRange(offset, 32))) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{Range: api.DownloadRange{Offset: offset, Length: 32}})) if !bytes.Equal(data[offset:offset+32], buffer.Bytes()) { fmt.Println(data[offset : offset+32]) fmt.Println(buffer.Bytes()) @@ -599,11 +599,11 @@ func TestUploadDownloadExtended(t *testing.T) { file2 := make([]byte, rhpv2.SectorSize/12) frand.Read(file1) frand.Read(file2) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(file1), "fileś/file1")) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(file2), "fileś/file2")) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(file1), api.DefaultBucketName, "fileś/file1", api.UploadObjectOptions{})) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(file2), api.DefaultBucketName, "fileś/file2", api.UploadObjectOptions{})) // fetch all entries from the worker - entries, err := cluster.Worker.ObjectEntries(context.Background(), api.DefaultBucketName, "", "", 0, -1) + entries, err := cluster.Worker.ObjectEntries(context.Background(), api.DefaultBucketName, "", api.ObjectEntriesOptions{}) tt.OK(err) if len(entries) != 1 { @@ -628,7 +628,7 @@ func TestUploadDownloadExtended(t *testing.T) { } // fetch entries from the worker for unexisting path - entries, err = cluster.Worker.ObjectEntries(context.Background(), api.DefaultBucketName, "bar/", "", 0, -1) + entries, err = cluster.Worker.ObjectEntries(context.Background(), api.DefaultBucketName, "bar/", api.ObjectEntriesOptions{}) tt.OK(err) if len(entries) != 0 { t.Fatal("expected no entries to be returned", len(entries)) @@ -641,8 +641,8 @@ func TestUploadDownloadExtended(t *testing.T) { // upload the data for _, data := range [][]byte{small, large} { tt.OKAll(frand.Read(data)) - name := fmt.Sprintf("data_%v", len(data)) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), name)) + path := fmt.Sprintf("data_%v", len(data)) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{})) } // check objects stats. @@ -667,7 +667,7 @@ func TestUploadDownloadExtended(t *testing.T) { for _, data := range [][]byte{small, large} { name := fmt.Sprintf("data_%v", len(data)) var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, name)) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, name, api.DownloadObjectOptions{})) // assert it matches if !bytes.Equal(data, buffer.Bytes()) { @@ -690,10 +690,10 @@ func TestUploadDownloadExtended(t *testing.T) { // download the data again for _, data := range [][]byte{small, large} { - name := fmt.Sprintf("data_%v", len(data)) + path := fmt.Sprintf("data_%v", len(data)) var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, name)) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{})) // assert it matches if !bytes.Equal(data, buffer.Bytes()) { @@ -701,7 +701,7 @@ func TestUploadDownloadExtended(t *testing.T) { } // delete the object - tt.OK(w.DeleteObject(context.Background(), name, false)) + tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, path, api.DeleteObjectOptions{})) } } @@ -764,8 +764,8 @@ func TestUploadDownloadSpending(t *testing.T) { tt.OKAll(frand.Read(data)) // upload the data - name := fmt.Sprintf("data_%v", len(data)) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), name)) + path := fmt.Sprintf("data_%v", len(data)) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{})) // Should be registered in bus. res, err := cluster.Bus.Object(context.Background(), api.DefaultBucketName, "", api.GetObjectOptions{}) @@ -773,7 +773,7 @@ func TestUploadDownloadSpending(t *testing.T) { var found bool for _, entry := range res.Entries { - if entry.Name == fmt.Sprintf("/%s", name) { + if entry.Name == fmt.Sprintf("/%s", path) { found = true break } @@ -784,7 +784,7 @@ func TestUploadDownloadSpending(t *testing.T) { // download the data var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, name)) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{})) // assert it matches if !bytes.Equal(data, buffer.Bytes()) { @@ -1033,8 +1033,8 @@ func TestParallelUpload(t *testing.T) { tt.OKAll(frand.Read(data)) // upload the data - name := fmt.Sprintf("/dir/data_%v", hex.EncodeToString(data[:16])) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), name)) + path := fmt.Sprintf("/dir/data_%v", hex.EncodeToString(data[:16])) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{})) return nil } @@ -1060,7 +1060,7 @@ func TestParallelUpload(t *testing.T) { } // Upload one more object. - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader([]byte("data")), "/foo")) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader([]byte("data")), api.DefaultBucketName, "/foo", api.UploadObjectOptions{})) objects, err = cluster.Bus.SearchObjects(context.Background(), api.DefaultBucketName, api.SearchObjectOptions{Key: "/", Limit: 100}) tt.OK(err) @@ -1106,12 +1106,12 @@ func TestParallelDownload(t *testing.T) { // upload the data data := frand.Bytes(rhpv2.SectorSize) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), "foo")) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, "foo", api.UploadObjectOptions{})) download := func() error { t.Helper() buf := bytes.NewBuffer(nil) - err := w.DownloadObject(context.Background(), buf, "foo") + err := w.DownloadObject(context.Background(), buf, api.DefaultBucketName, "foo", api.DownloadObjectOptions{}) if err != nil { return err } @@ -1260,7 +1260,7 @@ func TestUploadDownloadSameHost(t *testing.T) { // upload a file data := frand.Bytes(5*rhpv2.SectorSize + 1) - tt.OKAll(cluster.Worker.UploadObject(context.Background(), bytes.NewReader(data), "foo")) + tt.OKAll(cluster.Worker.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, "foo", api.UploadObjectOptions{})) // Download the file multiple times. var wg sync.WaitGroup @@ -1270,7 +1270,7 @@ func TestUploadDownloadSameHost(t *testing.T) { defer wg.Done() for i := 0; i < 5; i++ { buf := &bytes.Buffer{} - if err := cluster.Worker.DownloadObject(context.Background(), buf, "foo"); err != nil { + if err := cluster.Worker.DownloadObject(context.Background(), buf, api.DefaultBucketName, "foo", api.DownloadObjectOptions{}); err != nil { t.Error(err) break } @@ -1444,11 +1444,16 @@ func TestUploadPacking(t *testing.T) { frand.Read(data3) // declare helpers - download := func(name string, data []byte, offset, length int64) { + download := func(path string, data []byte, offset, length int64) { t.Helper() var buffer bytes.Buffer - if err := w.DownloadObject(context.Background(), &buffer, name, - api.DownloadWithRange(offset, length)); err != nil { + if err := w.DownloadObject( + context.Background(), + &buffer, + api.DefaultBucketName, + path, + api.DownloadObjectOptions{Range: api.DownloadRange{Offset: offset, Length: length}}, + ); err != nil { t.Fatal(err) } if !bytes.Equal(data[offset:offset+length], buffer.Bytes()) { @@ -1457,7 +1462,7 @@ func TestUploadPacking(t *testing.T) { } uploadDownload := func(name string, data []byte) { t.Helper() - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), name)) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, name, api.UploadObjectOptions{})) download(name, data, 0, int64(len(data))) res, err := b.Object(context.Background(), api.DefaultBucketName, name, api.GetObjectOptions{}) if err != nil { @@ -1466,7 +1471,7 @@ func TestUploadPacking(t *testing.T) { if res.Object.Size != int64(len(data)) { t.Fatal("unexpected size after upload", res.Object.Size, len(data)) } - entries, err := w.ObjectEntries(context.Background(), api.DefaultBucketName, "/", "", 0, -1) + entries, err := w.ObjectEntries(context.Background(), api.DefaultBucketName, "/", api.ObjectEntriesOptions{}) if err != nil { t.Fatal(err) } @@ -1686,7 +1691,7 @@ func TestSlabBufferStats(t *testing.T) { frand.Read(data2) // upload the first file - buffer should still be incomplete after this - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data1), "1")) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data1), api.DefaultBucketName, "1", api.UploadObjectOptions{})) // assert number of objects os, err := b.ObjectsStats() @@ -1740,7 +1745,7 @@ func TestSlabBufferStats(t *testing.T) { } // upload the second file - this should fill the buffer - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data2), "2")) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data2), api.DefaultBucketName, "2", api.UploadObjectOptions{})) // assert number of objects os, err = b.ObjectsStats() @@ -1863,7 +1868,7 @@ func TestMultipartUploads(t *testing.T) { // correctly. putPart := func(partNum int, offset int, data []byte) string { t.Helper() - res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), objPath, mpr.UploadID, partNum, api.UploadWithEncryptionOffset(int64(offset))) + res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), api.DefaultBucketName, objPath, mpr.UploadID, partNum, api.UploadMultipartUploadPartOptions{EncryptionOffset: offset}) tt.OK(err) if res.ETag == "" { t.Fatal("expected non-empty ETag") @@ -1913,7 +1918,7 @@ func TestMultipartUploads(t *testing.T) { } // Download object - gor, err := w.GetObject(context.Background(), api.DefaultBucketName, objPath) + gor, err := w.GetObject(context.Background(), api.DefaultBucketName, objPath, api.DownloadObjectOptions{}) tt.OK(err) if gor.Range != nil { t.Fatal("unexpected range:", gor.Range) @@ -1926,9 +1931,9 @@ func TestMultipartUploads(t *testing.T) { } // Download a range of the object - gor, err = w.GetObject(context.Background(), api.DefaultBucketName, objPath, api.DownloadWithRange(0, 1)) + gor, err = w.GetObject(context.Background(), api.DefaultBucketName, objPath, api.DownloadObjectOptions{Range: api.DownloadRange{Offset: 0, Length: 1}}) tt.OK(err) - if gor.Range == nil || gor.Range.Start != 0 || gor.Range.Length != 1 { + if gor.Range == nil || gor.Range.Offset != 0 || gor.Range.Length != 1 { t.Fatal("unexpected range:", gor.Range) } else if gor.Size != size { t.Fatal("unexpected size:", gor.Size) diff --git a/internal/testing/gouging_test.go b/internal/testing/gouging_test.go index e7c05be29..ef390fb1b 100644 --- a/internal/testing/gouging_test.go +++ b/internal/testing/gouging_test.go @@ -9,6 +9,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "go.uber.org/zap/zapcore" "lukechampine.com/frand" ) @@ -41,12 +42,12 @@ func TestGouging(t *testing.T) { tt.OKAll(frand.Read(data)) // upload the data - name := fmt.Sprintf("data_%v", len(data)) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), name)) + path := fmt.Sprintf("data_%v", len(data)) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{})) // download the data var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, name)) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{})) if !bytes.Equal(data, buffer.Bytes()) { t.Fatal("unexpected data") } @@ -69,7 +70,7 @@ func TestGouging(t *testing.T) { time.Sleep(defaultHostSettings.PriceTableValidity) // upload some data - should fail - tt.FailAll(w.UploadObject(context.Background(), bytes.NewReader(data), name)) + tt.FailAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{})) // update all host settings so they're gouging for _, h := range cluster.hosts { @@ -86,7 +87,7 @@ func TestGouging(t *testing.T) { // download the data - should fail buffer.Reset() - if err := w.DownloadObject(context.Background(), &buffer, name); err == nil { + if err := w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil { t.Fatal("expected download to fail") } } diff --git a/internal/testing/migrations_test.go b/internal/testing/migrations_test.go index fc6f19b55..d7b1dd3d4 100644 --- a/internal/testing/migrations_test.go +++ b/internal/testing/migrations_test.go @@ -55,7 +55,7 @@ func TestMigrations(t *testing.T) { // add an object data := make([]byte, rhpv2.SectorSize) frand.Read(data) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), "foo")) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, "foo", api.UploadObjectOptions{})) // assert amount of hosts used used := usedHosts("foo") diff --git a/internal/testing/pruning_test.go b/internal/testing/pruning_test.go index e72876a5c..0a4c9e096 100644 --- a/internal/testing/pruning_test.go +++ b/internal/testing/pruning_test.go @@ -171,7 +171,7 @@ func TestSectorPruning(t *testing.T) { // add several objects for i := 0; i < numObjects; i++ { filename := fmt.Sprintf("obj_%d", i) - tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader([]byte(filename)), filename)) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader([]byte(filename)), api.DefaultBucketName, filename, api.UploadObjectOptions{})) } // compare database against roots returned by the host diff --git a/internal/testing/uploads_test.go b/internal/testing/uploads_test.go index bc59da1cb..0ad328911 100644 --- a/internal/testing/uploads_test.go +++ b/internal/testing/uploads_test.go @@ -9,6 +9,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "lukechampine.com/frand" ) @@ -63,7 +64,7 @@ func TestUploadingSectorsCache(t *testing.T) { // upload an object using our custom reader br := newBlockedReader(data) go func() { - _, err := w.UploadObject(context.Background(), br, t.Name()) + _, err := w.UploadObject(context.Background(), br, api.DefaultBucketName, t.Name(), api.UploadObjectOptions{}) if err != nil { t.Error(err) } diff --git a/s3/backend.go b/s3/backend.go index ead95b5c5..9ca7340b4 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -233,15 +233,15 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range return nil, gofakes3.ErrorMessage(gofakes3.ErrNotImplemented, "range request from end not supported") } - var opts []api.DownloadObjectOption + opts := api.DownloadObjectOptions{} if rangeRequest != nil { length := int64(-1) if rangeRequest.End >= 0 { length = rangeRequest.End - rangeRequest.Start + 1 } - opts = append(opts, api.DownloadWithRange(rangeRequest.Start, length)) + opts.Range = api.DownloadRange{Offset: rangeRequest.Start, Length: length} } - res, err := s.w.GetObject(ctx, bucketName, objectName, opts...) + res, err := s.w.GetObject(ctx, bucketName, objectName, opts) if err != nil && strings.Contains(err.Error(), api.ErrBucketNotFound.Error()) { return nil, gofakes3.BucketNotFound(bucketName) } else if err != nil && strings.Contains(err.Error(), api.ErrObjectNotFound.Error()) { @@ -252,7 +252,7 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range var objectRange *gofakes3.ObjectRange if res.Range != nil { objectRange = &gofakes3.ObjectRange{ - Start: res.Range.Start, + Start: res.Range.Offset, Length: res.Range.Length, } } @@ -337,11 +337,11 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectName string) (g // TODO: Metadata is currently ignored. The backend requires an update to // support it. func (s *s3) PutObject(ctx context.Context, bucketName, key string, meta map[string]string, input io.Reader, size int64) (gofakes3.PutObjectResult, error) { - opts := []api.UploadOption{api.UploadWithBucket(bucketName)} + opts := api.UploadObjectOptions{} if ct, ok := meta["Content-Type"]; ok { - opts = append(opts, api.UploadWithMimeType(ct)) + opts.MimeType = ct } - ur, err := s.w.UploadObject(ctx, input, key, opts...) + ur, err := s.w.UploadObject(ctx, input, bucketName, key, opts) if err != nil && strings.Contains(err.Error(), api.ErrBucketNotFound.Error()) { return gofakes3.PutObjectResult{}, gofakes3.BucketNotFound(bucketName) } else if err != nil { @@ -401,12 +401,9 @@ func (s *s3) CreateMultipartUpload(ctx context.Context, bucket, key string, meta } func (s *s3) UploadPart(ctx context.Context, bucket, object string, id gofakes3.UploadID, partNumber int, contentLength int64, input io.Reader) (*gofakes3.UploadPartResult, error) { - opts := []api.UploadOption{ - api.UploadWithBucket(bucket), - api.UploadWithDisabledPreshardingEncryption(), - } - - res, err := s.w.UploadMultipartUploadPart(ctx, input, object, string(id), partNumber, opts...) + res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, api.UploadMultipartUploadPartOptions{ + DisablePreshardingEncryption: true, + }) if err != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) } diff --git a/s3/s3.go b/s3/s3.go index e6d221bd4..11c0f477c 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -48,9 +48,9 @@ type bus interface { } type worker interface { - UploadObject(ctx context.Context, r io.Reader, path string, opts ...api.UploadOption) (*api.UploadObjectResponse, error) - GetObject(ctx context.Context, path, bucket string, opts ...api.DownloadObjectOption) (*api.GetObjectResponse, error) - UploadMultipartUploadPart(ctx context.Context, r io.Reader, path, uploadID string, partNumber int, opts ...api.UploadOption) (*api.UploadMultipartUploadPartResponse, error) + GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) + UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) + UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) } func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) { diff --git a/worker/client.go b/worker/client/client.go similarity index 51% rename from worker/client.go rename to worker/client/client.go index 12e588306..f16d1b90a 100644 --- a/worker/client.go +++ b/worker/client/client.go @@ -1,4 +1,4 @@ -package worker +package client import ( "context" @@ -11,12 +11,10 @@ import ( "strings" "time" - rhpv2 "go.sia.tech/core/rhp/v2" rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" "go.sia.tech/jape" "go.sia.tech/renterd/api" - "go.sia.tech/renterd/hostdb" "go.sia.tech/renterd/object" ) @@ -25,138 +23,108 @@ type Client struct { c jape.Client } -func (c *Client) ID(ctx context.Context) (id string, err error) { - err = c.c.WithContext(ctx).GET("/id", &id) - return -} - -// RHPBroadcast broadcasts the latest revision for a contract. -func (c *Client) RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error) { - err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/broadcast", fcid), nil, nil) +// Account returns the account id for a given host. +func (c *Client) Account(ctx context.Context, hostKey types.PublicKey) (account rhpv3.Account, err error) { + err = c.c.WithContext(ctx).GET(fmt.Sprintf("/account/%s", hostKey), &account) return } -// RHPPruneContract prunes deleted sectors from the contract with given id. -func (c *Client) RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) { - var res api.RHPPruneContractResponse - err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/prune", fcid), api.RHPPruneContractRequest{ - Timeout: api.DurationMS(timeout), - }, &res) - pruned = res.Pruned - remaining = res.Remaining - return -} +// DownloadObject downloads the object at the given path. +func (c *Client) DownloadObject(ctx context.Context, w io.Writer, bucket, path string, opts api.DownloadObjectOptions) (err error) { + if strings.HasSuffix(path, "/") { + return errors.New("the given path is a directory, use ObjectEntries instead") + } -// RHPContractRoots fetches the roots of the contract with given id. -func (c *Client) RHPContractRoots(ctx context.Context, fcid types.FileContractID) (roots []types.Hash256, err error) { - err = c.c.WithContext(ctx).GET(fmt.Sprintf("/rhp/contract/%s/roots", fcid), &roots) - return + path = api.ObjectPathEscape(path) + body, _, err := c.object(ctx, bucket, path, opts) + if err != nil { + return err + } + defer body.Close() + _, err = io.Copy(w, body) + return err } -// RHPScan scans a host, returning its current settings. -func (c *Client) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (resp api.RHPScanResponse, err error) { - err = c.c.WithContext(ctx).POST("/rhp/scan", api.RHPScanRequest{ - HostKey: hostKey, - HostIP: hostIP, - Timeout: api.DurationMS(timeout), - }, &resp) +// DownloadStats returns download statistics. +func (c *Client) DownloadStats() (resp api.DownloadStatsResponse, err error) { + err = c.c.GET("/stats/downloads", &resp) return } -// RHPForm forms a contract with a host. -func (c *Client) RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error) { - req := api.RHPFormRequest{ - EndHeight: endHeight, - HostCollateral: hostCollateral, - HostKey: hk, - HostIP: hostIP, - RenterFunds: renterFunds, - RenterAddress: renterAddress, +// GetObject returns the object at given path alongside its metadata. +func (c *Client) GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) { + if strings.HasSuffix(path, "/") { + return nil, errors.New("the given path is a directory, use ObjectEntries instead") } - var resp api.RHPFormResponse - err := c.c.WithContext(ctx).POST("/rhp/form", req, &resp) - return resp.Contract, resp.TransactionSet, err -} -// RHPRenew renews an existing contract with a host. -func (c *Client) RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, siamuxAddr string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error) { - req := api.RHPRenewRequest{ - ContractID: fcid, - EndHeight: endHeight, - HostAddress: hostAddress, - HostKey: hk, - NewCollateral: newCollateral, - RenterAddress: renterAddress, - RenterFunds: renterFunds, - SiamuxAddr: siamuxAddr, - WindowSize: windowSize, + // Start download. + path = api.ObjectPathEscape(path) + body, header, err := c.object(ctx, bucket, path, opts) + if err != nil { + return nil, err } + defer func() { + if err != nil { + _, _ = io.Copy(io.Discard, body) + _ = body.Close() + } + }() - var resp api.RHPRenewResponse - err := c.c.WithContext(ctx).POST("/rhp/renew", req, &resp) - return resp.Contract, resp.TransactionSet, err -} - -// RHPFund funds an ephemeral account using the supplied contract. -func (c *Client) RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error) { - req := api.RHPFundRequest{ - ContractID: contractID, - HostKey: hostKey, - SiamuxAddr: siamuxAddr, - Balance: balance, + // Parse header. + var size int64 + _, err = fmt.Sscan(header.Get("Content-Length"), &size) + if err != nil { + return nil, err } - err = c.c.WithContext(ctx).POST("/rhp/fund", req, nil) - return -} + var r *api.DownloadRange + if cr := header.Get("Content-Range"); cr != "" { + dr, err := api.ParseDownloadRange(cr) + if err != nil { + return nil, err + } + r = &dr -// RHPSync funds an ephemeral account using the supplied contract. -func (c *Client) RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error) { - req := api.RHPSyncRequest{ - ContractID: contractID, - HostKey: hostKey, - SiamuxAddr: siamuxAddr, + // If a range is set, the size is the size extracted from the range + // since Content-Length will then only be the length of the returned + // range. + size = dr.Size } - err = c.c.WithContext(ctx).POST("/rhp/sync", req, nil) - return -} - -// RHPPriceTable fetches a price table for a host. -func (c *Client) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (pt hostdb.HostPriceTable, err error) { - req := api.RHPPriceTableRequest{ - HostKey: hostKey, - SiamuxAddr: siamuxAddr, - Timeout: api.DurationMS(timeout), + // Parse Last-Modified + modTime, err := time.Parse(http.TimeFormat, header.Get("Last-Modified")) + if err != nil { + return nil, err } - err = c.c.WithContext(ctx).POST("/rhp/pricetable", req, &pt) - return + + return &api.GetObjectResponse{ + Content: body, + ContentType: header.Get("Content-Type"), + ModTime: modTime.UTC(), + Range: r, + Size: size, + }, nil } -// RHPReadRegistry reads a registry value. -func (c *Client) RHPReadRegistry(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, key rhpv3.RegistryKey, payment rhpv3.PayByEphemeralAccountRequest) (resp rhpv3.RegistryValue, err error) { - req := api.RHPRegistryReadRequest{ - HostKey: hostKey, - SiamuxAddr: siamuxAddr, - RegistryKey: key, - Payment: payment, - } - err = c.c.WithContext(ctx).POST("/rhp/registry/read", req, &resp) +// ID returns the id of the worker. +func (c *Client) ID(ctx context.Context) (id string, err error) { + err = c.c.WithContext(ctx).GET("/id", &id) return } -// RHPUpdateRegistry updates a registry value. -func (c *Client) RHPUpdateRegistry(ctx context.Context, hostKey types.PublicKey, key rhpv3.RegistryKey, value rhpv3.RegistryValue) (err error) { - req := api.RHPRegistryUpdateRequest{ - HostKey: hostKey, - RegistryKey: key, - RegistryValue: value, - } - err = c.c.WithContext(ctx).POST("/rhp/registry/update", req, nil) +// Contracts returns all contracts from the worker. These contracts decorate a +// bus contract with the contract's latest revision. +func (c *Client) Contracts(ctx context.Context, hostTimeout time.Duration) (resp api.ContractsResponse, err error) { + err = c.c.WithContext(ctx).GET(fmt.Sprintf("/rhp/contracts?hosttimeout=%s", api.DurationMS(hostTimeout)), &resp) return } -// State returns the current state of the worker. -func (c *Client) State() (state api.WorkerStateResponse, err error) { - err = c.c.GET("/state", &state) +// DeleteObject deletes the object at the given path. +func (c *Client) DeleteObject(ctx context.Context, bucket, path string, opts api.DeleteObjectOptions) (err error) { + values := url.Values{} + values.Set("bucket", bucket) + opts.Apply(values) + + path = api.ObjectPathEscape(path) + err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/objects/%s?"+values.Encode(), path)) return } @@ -168,28 +136,41 @@ func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) return } -// DownloadStats returns the upload stats. -func (c *Client) DownloadStats() (resp api.DownloadStatsResponse, err error) { - err = c.c.GET("/stats/downloads", &resp) +// ObjectEntries returns the entries at the given path, which must end in /. +func (c *Client) ObjectEntries(ctx context.Context, bucket, path string, opts api.ObjectEntriesOptions) (entries []api.ObjectMetadata, err error) { + path = api.ObjectPathEscape(path) + body, _, err := c.object(ctx, bucket, path, api.DownloadObjectOptions{ + Prefix: opts.Prefix, + Offset: opts.Offset, + Limit: opts.Limit, + }) + if err != nil { + return nil, err + } + defer io.Copy(io.Discard, body) + defer body.Close() + err = json.NewDecoder(body).Decode(&entries) return } -// UploadStats returns the upload stats. -func (c *Client) UploadStats() (resp api.UploadStatsResponse, err error) { - err = c.c.GET("/stats/uploads", &resp) +// State returns the current state of the worker. +func (c *Client) State() (state api.WorkerStateResponse, err error) { + err = c.c.GET("/state", &state) return } -// UploadObject uploads the data in r, creating an object at the given path. -func (c *Client) UploadObject(ctx context.Context, r io.Reader, path string, opts ...api.UploadOption) (*api.UploadObjectResponse, error) { +// UploadMultipartUploadPart uploads part of the data for a multipart upload. +func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { path = api.ObjectPathEscape(path) - c.c.Custom("PUT", fmt.Sprintf("/objects/%s", path), []byte{}, nil) + c.c.Custom("PUT", fmt.Sprintf("/multipart/%s", path), []byte{}, nil) values := make(url.Values) - for _, opt := range opts { - opt(values) - } - u, err := url.Parse(fmt.Sprintf("%v/objects/%v", c.c.BaseURL, path)) + values.Set("bucket", bucket) + values.Set("uploadid", uploadID) + values.Set("partnumber", fmt.Sprint(partNumber)) + opts.Apply(values) + + u, err := url.Parse(fmt.Sprintf("%v/multipart/%v", c.c.BaseURL, path)) if err != nil { panic(err) } @@ -209,21 +190,18 @@ func (c *Client) UploadObject(ctx context.Context, r io.Reader, path string, opt err, _ := io.ReadAll(resp.Body) return nil, errors.New(string(err)) } - return &api.UploadObjectResponse{ETag: resp.Header.Get("ETag")}, nil + return &api.UploadMultipartUploadPartResponse{ETag: resp.Header.Get("ETag")}, nil } -// UploadMultipartUploadPart uploads part of the data for a multipart upload. -func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, path, uploadID string, partNumber int, opts ...api.UploadOption) (*api.UploadMultipartUploadPartResponse, error) { +// UploadObject uploads the data in r, creating an object at the given path. +func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) { path = api.ObjectPathEscape(path) - c.c.Custom("PUT", fmt.Sprintf("/multipart/%s", path), []byte{}, nil) + c.c.Custom("PUT", fmt.Sprintf("/objects/%s", path), []byte{}, nil) values := make(url.Values) - values.Set("uploadid", uploadID) - values.Set("partnumber", fmt.Sprint(partNumber)) - for _, opt := range opts { - opt(values) - } - u, err := url.Parse(fmt.Sprintf("%v/multipart/%v", c.c.BaseURL, path)) + values.Set("bucket", bucket) + opts.Apply(values) + u, err := url.Parse(fmt.Sprintf("%v/objects/%v", c.c.BaseURL, path)) if err != nil { panic(err) } @@ -243,15 +221,28 @@ func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, pat err, _ := io.ReadAll(resp.Body) return nil, errors.New(string(err)) } - return &api.UploadMultipartUploadPartResponse{ETag: resp.Header.Get("ETag")}, nil + return &api.UploadObjectResponse{ETag: resp.Header.Get("ETag")}, nil +} + +// UploadStats returns the upload stats. +func (c *Client) UploadStats() (resp api.UploadStatsResponse, err error) { + err = c.c.GET("/stats/uploads", &resp) + return } -func (c *Client) object(ctx context.Context, bucket, path, prefix string, offset, limit int, opts ...api.DownloadObjectOption) (_ io.ReadCloser, _ http.Header, err error) { +// New returns a client that communicates with a renterd worker server +// listening on the specified address. +func New(addr, password string) *Client { + return &Client{jape.Client{ + BaseURL: addr, + Password: password, + }} +} + +func (c *Client) object(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (_ io.ReadCloser, _ http.Header, err error) { values := url.Values{} values.Set("bucket", url.QueryEscape(bucket)) - values.Set("prefix", url.QueryEscape(prefix)) - values.Set("offset", fmt.Sprint(offset)) - values.Set("limit", fmt.Sprint(limit)) + opts.ApplyValues(values) path += "?" + values.Encode() c.c.Custom("GET", fmt.Sprintf("/objects/%s", path), nil, (*[]api.ObjectMetadata)(nil)) @@ -260,9 +251,8 @@ func (c *Client) object(ctx context.Context, bucket, path, prefix string, offset panic(err) } req.SetBasicAuth("", c.c.WithContext(ctx).Password) - for _, opt := range opts { - opt(req.Header) - } + opts.ApplyHeaders(req.Header) + resp, err := http.DefaultClient.Do(req) if err != nil { return nil, nil, err @@ -274,116 +264,3 @@ func (c *Client) object(ctx context.Context, bucket, path, prefix string, offset } return resp.Body, resp.Header, err } - -// ObjectEntries returns the entries at the given path, which must end in /. -func (c *Client) ObjectEntries(ctx context.Context, bucket, path, prefix string, offset, limit int) (entries []api.ObjectMetadata, err error) { - path = api.ObjectPathEscape(path) - body, _, err := c.object(ctx, bucket, path, prefix, offset, limit) - if err != nil { - return nil, err - } - defer io.Copy(io.Discard, body) - defer body.Close() - err = json.NewDecoder(body).Decode(&entries) - return -} - -// DownloadObject downloads the object at the given path, writing its data to -// w. -func (c *Client) DownloadObject(ctx context.Context, w io.Writer, path string, opts ...api.DownloadObjectOption) (err error) { - if strings.HasSuffix(path, "/") { - return errors.New("the given path is a directory, use ObjectEntries instead") - } - - path = api.ObjectPathEscape(path) - body, _, err := c.object(ctx, api.DefaultBucketName, path, "", 0, -1, opts...) - if err != nil { - return err - } - defer body.Close() - _, err = io.Copy(w, body) - return err -} - -func (c *Client) GetObject(ctx context.Context, bucket, path string, opts ...api.DownloadObjectOption) (*api.GetObjectResponse, error) { - if strings.HasSuffix(path, "/") { - return nil, errors.New("the given path is a directory, use ObjectEntries instead") - } - - // Start download. - path = api.ObjectPathEscape(path) - body, header, err := c.object(ctx, bucket, path, "", 0, -1, opts...) - if err != nil { - return nil, err - } - defer func() { - if err != nil { - _, _ = io.Copy(io.Discard, body) - _ = body.Close() - } - }() - - // Parse header. - var size int64 - _, err = fmt.Sscan(header.Get("Content-Length"), &size) - if err != nil { - return nil, err - } - var r *api.DownloadRange - if cr := header.Get("Content-Range"); cr != "" { - dr, err := api.ParseDownloadRange(cr) - if err != nil { - return nil, err - } - r = &dr - - // If a range is set, the size is the size extracted from the range - // since Content-Length will then only be the length of the returned - // range. - size = dr.Size - } - // Parse Last-Modified - modTime, err := time.Parse(http.TimeFormat, header.Get("Last-Modified")) - if err != nil { - return nil, err - } - - return &api.GetObjectResponse{ - Content: body, - ContentType: header.Get("Content-Type"), - ModTime: modTime.UTC(), - Range: r, - Size: size, - }, nil -} - -// DeleteObject deletes the object at the given path. -func (c *Client) DeleteObject(ctx context.Context, path string, batch bool) (err error) { - path = api.ObjectPathEscape(path) - values := url.Values{} - values.Set("batch", fmt.Sprint(batch)) - err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/objects/%s?"+values.Encode(), path)) - return -} - -// Contracts returns all contracts from the worker. These contracts decorate a -// bus contract with the contract's latest revision. -func (c *Client) Contracts(ctx context.Context, hostTimeout time.Duration) (resp api.ContractsResponse, err error) { - err = c.c.WithContext(ctx).GET(fmt.Sprintf("/rhp/contracts?hosttimeout=%s", api.DurationMS(hostTimeout)), &resp) - return -} - -// Account returns the account id for a given host. -func (c *Client) Account(ctx context.Context, hostKey types.PublicKey) (account rhpv3.Account, err error) { - err = c.c.WithContext(ctx).GET(fmt.Sprintf("/account/%s", hostKey), &account) - return -} - -// NewClient returns a client that communicates with a renterd worker server -// listening on the specified address. -func NewClient(addr, password string) *Client { - return &Client{jape.Client{ - BaseURL: addr, - Password: password, - }} -} diff --git a/worker/client/rhp.go b/worker/client/rhp.go new file mode 100644 index 000000000..175df47e3 --- /dev/null +++ b/worker/client/rhp.go @@ -0,0 +1,138 @@ +package client + +import ( + "context" + "fmt" + "time" + + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/hostdb" + + rhpv2 "go.sia.tech/core/rhp/v2" + rhpv3 "go.sia.tech/core/rhp/v3" +) + +// RHPBroadcast broadcasts the latest revision for a contract. +func (c *Client) RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error) { + err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/broadcast", fcid), nil, nil) + return +} + +// RHPContractRoots fetches the roots of the contract with given id. +func (c *Client) RHPContractRoots(ctx context.Context, fcid types.FileContractID) (roots []types.Hash256, err error) { + err = c.c.WithContext(ctx).GET(fmt.Sprintf("/rhp/contract/%s/roots", fcid), &roots) + return +} + +// RHPForm forms a contract with a host. +func (c *Client) RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error) { + req := api.RHPFormRequest{ + EndHeight: endHeight, + HostCollateral: hostCollateral, + HostKey: hk, + HostIP: hostIP, + RenterFunds: renterFunds, + RenterAddress: renterAddress, + } + var resp api.RHPFormResponse + err := c.c.WithContext(ctx).POST("/rhp/form", req, &resp) + return resp.Contract, resp.TransactionSet, err +} + +// RHPFund funds an ephemeral account using the supplied contract. +func (c *Client) RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error) { + req := api.RHPFundRequest{ + ContractID: contractID, + HostKey: hostKey, + SiamuxAddr: siamuxAddr, + Balance: balance, + } + err = c.c.WithContext(ctx).POST("/rhp/fund", req, nil) + return +} + +// RHPPriceTable fetches a price table for a host. +func (c *Client) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (pt hostdb.HostPriceTable, err error) { + req := api.RHPPriceTableRequest{ + HostKey: hostKey, + SiamuxAddr: siamuxAddr, + Timeout: api.DurationMS(timeout), + } + err = c.c.WithContext(ctx).POST("/rhp/pricetable", req, &pt) + return +} + +// RHPPruneContract prunes deleted sectors from the contract with given id. +func (c *Client) RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) { + var res api.RHPPruneContractResponse + err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/prune", fcid), api.RHPPruneContractRequest{ + Timeout: api.DurationMS(timeout), + }, &res) + pruned = res.Pruned + remaining = res.Remaining + return +} + +// RHPReadRegistry reads a registry value. +func (c *Client) RHPReadRegistry(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, key rhpv3.RegistryKey, payment rhpv3.PayByEphemeralAccountRequest) (resp rhpv3.RegistryValue, err error) { + req := api.RHPRegistryReadRequest{ + HostKey: hostKey, + SiamuxAddr: siamuxAddr, + RegistryKey: key, + Payment: payment, + } + err = c.c.WithContext(ctx).POST("/rhp/registry/read", req, &resp) + return +} + +// RHPRenew renews an existing contract with a host. +func (c *Client) RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, siamuxAddr string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error) { + req := api.RHPRenewRequest{ + ContractID: fcid, + EndHeight: endHeight, + HostAddress: hostAddress, + HostKey: hk, + NewCollateral: newCollateral, + RenterAddress: renterAddress, + RenterFunds: renterFunds, + SiamuxAddr: siamuxAddr, + WindowSize: windowSize, + } + + var resp api.RHPRenewResponse + err := c.c.WithContext(ctx).POST("/rhp/renew", req, &resp) + return resp.Contract, resp.TransactionSet, err +} + +// RHPScan scans a host, returning its current settings. +func (c *Client) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (resp api.RHPScanResponse, err error) { + err = c.c.WithContext(ctx).POST("/rhp/scan", api.RHPScanRequest{ + HostKey: hostKey, + HostIP: hostIP, + Timeout: api.DurationMS(timeout), + }, &resp) + return +} + +// RHPSync funds an ephemeral account using the supplied contract. +func (c *Client) RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error) { + req := api.RHPSyncRequest{ + ContractID: contractID, + HostKey: hostKey, + SiamuxAddr: siamuxAddr, + } + err = c.c.WithContext(ctx).POST("/rhp/sync", req, nil) + return +} + +// RHPUpdateRegistry updates a registry value. +func (c *Client) RHPUpdateRegistry(ctx context.Context, hostKey types.PublicKey, key rhpv3.RegistryKey, value rhpv3.RegistryValue) (err error) { + req := api.RHPRegistryUpdateRequest{ + HostKey: hostKey, + RegistryKey: key, + RegistryValue: value, + } + err = c.c.WithContext(ctx).POST("/rhp/registry/update", req, nil) + return +} diff --git a/worker/worker.go b/worker/worker.go index 370b5c7ac..7ca73736e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -29,6 +29,7 @@ import ( "go.sia.tech/renterd/object" "go.sia.tech/renterd/tracing" "go.sia.tech/renterd/webhooks" + "go.sia.tech/renterd/worker/client" "go.sia.tech/siad/modules" "go.uber.org/zap" "golang.org/x/crypto/blake2b" @@ -53,6 +54,17 @@ const ( lockingPriorityBackgroundUpload = 5 ) +// re-export the client +type Client struct { + *client.Client +} + +func NewClient(address, password string) *Client { + return &Client{ + Client: client.New(address, password), + } +} + var privateSubnets []*net.IPNet func init() { @@ -70,36 +82,6 @@ func init() { } } -// rangedResponseWriter is a wrapper around http.ResponseWriter. The difference -// to the standard http.ResponseWriter is that it allows for overriding the -// default status code that is sent upon the first call to Write with a custom -// one. -type rangedResponseWriter struct { - rw http.ResponseWriter - defaultStatusCode int - headerWritten bool -} - -func (rw *rangedResponseWriter) Write(p []byte) (int, error) { - if !rw.headerWritten { - contentType := rw.Header().Get("Content-Type") - if contentType == "" { - rw.Header().Set("Content-Type", http.DetectContentType(p)) - } - rw.WriteHeader(rw.defaultStatusCode) - } - return rw.rw.Write(p) -} - -func (rw *rangedResponseWriter) Header() http.Header { - return rw.rw.Header() -} - -func (rw *rangedResponseWriter) WriteHeader(statusCode int) { - rw.headerWritten = true - rw.rw.WriteHeader(statusCode) -} - type AccountStore interface { Accounts(ctx context.Context) ([]api.Account, error) AddBalance(ctx context.Context, id rhpv3.Account, hk types.PublicKey, amt *big.Int) error