diff --git a/worker/worker.go b/worker/worker.go index c7f62c96c..eb2de583f 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -2,7 +2,6 @@ package worker import ( "context" - "encoding/hex" "errors" "fmt" "io" @@ -1134,76 +1133,18 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { } // upload the object - obj, partialSlabData, _, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking) + obj, partialSlabData, etag, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking) if jc.Check("couldn't upload object", err) != nil { return } - // build used contracts map - h2c := make(map[types.PublicKey]types.FileContractID) - for _, c := range contracts { - h2c[c.HostKey] = c.ID - } - used := make(map[types.PublicKey]types.FileContractID) - for _, s := range obj.Slabs { - for _, ss := range s.Shards { - used[ss.Host] = h2c[ss.Host] - } - } - - if len(partialSlabData) > 0 { - partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet) - if jc.Check("couldn't add partial slabs to bus", err) != nil { - return - } - obj.PartialSlabs = partialSlabs - } - - // persist the object - if jc.Check("couldn't add object", w.bus.AddObject(ctx, bucket, jc.PathParam("path"), up.ContractSet, obj, used)) != nil { - return - } - - // if partial uploads are not enabled we are done. - if !up.UploadPacking { - return - } + // set etag in header response. + jc.ResponseWriter.Header().Set("ETag", api.FormatEtag(etag)) - // if partial uploads are enabled, check whether we have a full slab now - packedSlabs, err := w.bus.PackedSlabsForUpload(jc.Request.Context(), 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet, 2) - if jc.Check("couldn't fetch packed slabs from bus", err) != nil { + // handle partial slabs + if jc.Check("failed to finish upload", w.finishUpload(jc, bucket, obj, partialSlabData, contracts, up)) != nil { return } - - for _, ps := range packedSlabs { - // upload packed slab. - shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) - sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) - if jc.Check("couldn't upload packed slab", err) != nil { - return - } - - // build used contracts map - h2c := make(map[types.PublicKey]types.FileContractID) - for _, c := range contracts { - h2c[c.HostKey] = c.ID - } - used := make(map[types.PublicKey]types.FileContractID) - for _, s := range sectors { - used[s.Host] = h2c[s.Host] - } - - // mark packed slab as uploaded. - err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{ - { - BufferID: ps.BufferID, - Shards: sectors, - }, - }, used) - if jc.Check("couldn't mark packed slabs uploaded", err) != nil { - return - } - } } func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { @@ -1305,74 +1246,13 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { return } - // build used contracts map - h2c := make(map[types.PublicKey]types.FileContractID) - for _, c := range contracts { - h2c[c.HostKey] = c.ID - } - used := make(map[types.PublicKey]types.FileContractID) - for _, s := range obj.Slabs { - for _, ss := range s.Shards { - used[ss.Host] = h2c[ss.Host] - } - } - - if len(partialSlabData) > 0 { - partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet) - if jc.Check("couldn't add partial slabs to bus", err) != nil { - return - } - obj.PartialSlabs = partialSlabs - } - - // persist the part - if jc.Check("couldn't add part", w.bus.AddMultipartPart(ctx, bucket, jc.PathParam("path"), up.ContractSet, uploadID, partNumber, obj.Slabs, obj.PartialSlabs, hex.EncodeToString(etag), used)) != nil { - return - } - // set etag in header response. jc.ResponseWriter.Header().Set("ETag", api.FormatEtag(etag)) - // if partial uploads are not enabled we are done. - if !up.UploadPacking { + // handle partial slabs + if jc.Check("failed to finish upload", w.finishUpload(jc, bucket, obj, partialSlabData, contracts, up)) != nil { return } - - // if partial uploads are enabled, check whether we have a full slab now - packedSlabs, err := w.bus.PackedSlabsForUpload(jc.Request.Context(), 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet, 2) - if jc.Check("couldn't fetch packed slabs from bus", err) != nil { - return - } - - for _, ps := range packedSlabs { - // upload packed slab. - shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) - sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) - if jc.Check("couldn't upload packed slab", err) != nil { - return - } - - // build used contracts map - h2c := make(map[types.PublicKey]types.FileContractID) - for _, c := range contracts { - h2c[c.HostKey] = c.ID - } - used := make(map[types.PublicKey]types.FileContractID) - for _, s := range sectors { - used[s.Host] = h2c[s.Host] - } - - // mark packed slab as uploaded. - err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{ - { - BufferID: ps.BufferID, - Shards: sectors, - }, - }, used) - if jc.Check("couldn't mark packed slabs uploaded", err) != nil { - return - } - } } func encryptPartialSlab(data []byte, key object.EncryptionKey, minShards, totalShards uint8) [][]byte { @@ -1729,3 +1609,76 @@ func isPrivateIP(addr net.IP) bool { } return false } + +func (w *worker) finishUpload(jc jape.Context, bucket string, obj object.Object, partialSlabData []byte, contracts []api.ContractMetadata, up api.UploadParams) error { + ctx := jc.Request.Context() + rs := up.RedundancySettings + + // build used contracts map + h2c := make(map[types.PublicKey]types.FileContractID) + for _, c := range contracts { + h2c[c.HostKey] = c.ID + } + used := make(map[types.PublicKey]types.FileContractID) + for _, s := range obj.Slabs { + for _, ss := range s.Shards { + used[ss.Host] = h2c[ss.Host] + } + } + + if len(partialSlabData) > 0 { + partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet) + if err != nil { + return fmt.Errorf("couldn't add partial slabs to bus: %w", err) + } + obj.PartialSlabs = partialSlabs + } + + // persist the object + err := w.bus.AddObject(ctx, bucket, jc.PathParam("path"), up.ContractSet, obj, used) + if err != nil { + return fmt.Errorf("couldn't add object: %w", err) + } + + // if partial uploads are not enabled we are done. + if !up.UploadPacking { + return nil + } + + // if partial uploads are enabled, check whether we have a full slab now + packedSlabs, err := w.bus.PackedSlabsForUpload(jc.Request.Context(), 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet, 2) + if err != nil { + return fmt.Errorf("couldn't fetch packed slabs from bus: %w", err) + } + + for _, ps := range packedSlabs { + // upload packed slab. + shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) + sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) + if err != nil { + return fmt.Errorf("couldn't upload packed slab: %w", err) + } + + // build used contracts map + h2c := make(map[types.PublicKey]types.FileContractID) + for _, c := range contracts { + h2c[c.HostKey] = c.ID + } + used := make(map[types.PublicKey]types.FileContractID) + for _, s := range sectors { + used[s.Host] = h2c[s.Host] + } + + // mark packed slab as uploaded. + err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{ + { + BufferID: ps.BufferID, + Shards: sectors, + }, + }, used) + if err != nil { + return fmt.Errorf("couldn't mark packed slabs uploaded: %w", err) + } + } + return nil +}