Skip to content

Commit

Permalink
worker: introduce finishUpload helper
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Sep 13, 2023
1 parent 0e4e03e commit ba5eb28
Showing 1 changed file with 80 additions and 127 deletions.
207 changes: 80 additions & 127 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package worker

import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -1134,76 +1133,18 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) {
}

// upload the object
obj, partialSlabData, _, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking)
obj, partialSlabData, etag, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking)
if jc.Check("couldn't upload object", err) != nil {
return
}

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range obj.Slabs {
for _, ss := range s.Shards {
used[ss.Host] = h2c[ss.Host]
}
}

if len(partialSlabData) > 0 {
partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet)
if jc.Check("couldn't add partial slabs to bus", err) != nil {
return
}
obj.PartialSlabs = partialSlabs
}

// persist the object
if jc.Check("couldn't add object", w.bus.AddObject(ctx, bucket, jc.PathParam("path"), up.ContractSet, obj, used)) != nil {
return
}

// if partial uploads are not enabled we are done.
if !up.UploadPacking {
return
}
// set etag in header response.
jc.ResponseWriter.Header().Set("ETag", api.FormatEtag(etag))

// if partial uploads are enabled, check whether we have a full slab now
packedSlabs, err := w.bus.PackedSlabsForUpload(jc.Request.Context(), 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet, 2)
if jc.Check("couldn't fetch packed slabs from bus", err) != nil {
// handle partial slabs
if jc.Check("failed to finish upload", w.finishUpload(jc, bucket, obj, partialSlabData, contracts, up)) != nil {
return
}

for _, ps := range packedSlabs {
// upload packed slab.
shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards))
sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight)
if jc.Check("couldn't upload packed slab", err) != nil {
return
}

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range sectors {
used[s.Host] = h2c[s.Host]
}

// mark packed slab as uploaded.
err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{
{
BufferID: ps.BufferID,
Shards: sectors,
},
}, used)
if jc.Check("couldn't mark packed slabs uploaded", err) != nil {
return
}
}
}

func (w *worker) multipartUploadHandlerPUT(jc jape.Context) {
Expand Down Expand Up @@ -1305,74 +1246,13 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) {
return
}

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range obj.Slabs {
for _, ss := range s.Shards {
used[ss.Host] = h2c[ss.Host]
}
}

if len(partialSlabData) > 0 {
partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet)
if jc.Check("couldn't add partial slabs to bus", err) != nil {
return
}
obj.PartialSlabs = partialSlabs
}

// persist the part
if jc.Check("couldn't add part", w.bus.AddMultipartPart(ctx, bucket, jc.PathParam("path"), up.ContractSet, uploadID, partNumber, obj.Slabs, obj.PartialSlabs, hex.EncodeToString(etag), used)) != nil {
return
}

// set etag in header response.
jc.ResponseWriter.Header().Set("ETag", api.FormatEtag(etag))

// if partial uploads are not enabled we are done.
if !up.UploadPacking {
// handle partial slabs
if jc.Check("failed to finish upload", w.finishUpload(jc, bucket, obj, partialSlabData, contracts, up)) != nil {
return
}

// if partial uploads are enabled, check whether we have a full slab now
packedSlabs, err := w.bus.PackedSlabsForUpload(jc.Request.Context(), 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet, 2)
if jc.Check("couldn't fetch packed slabs from bus", err) != nil {
return
}

for _, ps := range packedSlabs {
// upload packed slab.
shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards))
sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight)
if jc.Check("couldn't upload packed slab", err) != nil {
return
}

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range sectors {
used[s.Host] = h2c[s.Host]
}

// mark packed slab as uploaded.
err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{
{
BufferID: ps.BufferID,
Shards: sectors,
},
}, used)
if jc.Check("couldn't mark packed slabs uploaded", err) != nil {
return
}
}
}

func encryptPartialSlab(data []byte, key object.EncryptionKey, minShards, totalShards uint8) [][]byte {
Expand Down Expand Up @@ -1729,3 +1609,76 @@ func isPrivateIP(addr net.IP) bool {
}
return false
}

func (w *worker) finishUpload(jc jape.Context, bucket string, obj object.Object, partialSlabData []byte, contracts []api.ContractMetadata, up api.UploadParams) error {
ctx := jc.Request.Context()
rs := up.RedundancySettings

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range obj.Slabs {
for _, ss := range s.Shards {
used[ss.Host] = h2c[ss.Host]
}
}

if len(partialSlabData) > 0 {
partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet)
if err != nil {
return fmt.Errorf("couldn't add partial slabs to bus: %w", err)
}
obj.PartialSlabs = partialSlabs
}

// persist the object
err := w.bus.AddObject(ctx, bucket, jc.PathParam("path"), up.ContractSet, obj, used)
if err != nil {
return fmt.Errorf("couldn't add object: %w", err)
}

// if partial uploads are not enabled we are done.
if !up.UploadPacking {
return nil
}

// if partial uploads are enabled, check whether we have a full slab now
packedSlabs, err := w.bus.PackedSlabsForUpload(jc.Request.Context(), 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet, 2)
if err != nil {
return fmt.Errorf("couldn't fetch packed slabs from bus: %w", err)
}

for _, ps := range packedSlabs {
// upload packed slab.
shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards))
sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight)
if err != nil {
return fmt.Errorf("couldn't upload packed slab: %w", err)
}

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range sectors {
used[s.Host] = h2c[s.Host]
}

// mark packed slab as uploaded.
err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{
{
BufferID: ps.BufferID,
Shards: sectors,
},
}, used)
if err != nil {
return fmt.Errorf("couldn't mark packed slabs uploaded: %w", err)
}
}
return nil
}

0 comments on commit ba5eb28

Please sign in to comment.