Skip to content

Commit

Permalink
Merge pull request #758 from SiaFoundation/chris/parallelise-packed-u…
Browse files Browse the repository at this point in the history
…ploads

Start uploading next packed slab before previous one is finished
  • Loading branch information
ChrisSchinnerl authored Nov 29, 2023
2 parents dc05019 + ff147e8 commit a4d7e01
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 94 deletions.
9 changes: 9 additions & 0 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ type (
Error string `json:"error,omitempty"`
}

MemoryResponse struct {
Upload MemoryStatus `json:"upload"`
}

MemoryStatus struct {
Available uint64 `json:"available"`
Total uint64 `json:"total"`
}

// MigrateSlabResponse is the response type for the /slab/migrate endpoint.
MigrateSlabResponse struct {
NumShardsMigrated int `json:"numShardsMigrated"`
Expand Down
3 changes: 3 additions & 0 deletions cmd/renterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var (
DownloadMaxOverdrive: 5,
DownloadOverdriveTimeout: 3 * time.Second,

UploadMaxMemory: 1 << 30, // 1 GiB
UploadMaxOverdrive: 5,
UploadOverdriveTimeout: 3 * time.Second,
},
Expand Down Expand Up @@ -250,6 +251,7 @@ func main() {
flag.Uint64Var(&cfg.Worker.DownloadMaxOverdrive, "worker.downloadMaxOverdrive", cfg.Worker.DownloadMaxOverdrive, "maximum number of active overdrive workers when downloading a slab")
flag.StringVar(&cfg.Worker.ID, "worker.id", cfg.Worker.ID, "unique identifier of worker used internally - can be overwritten using the RENTERD_WORKER_ID environment variable")
flag.DurationVar(&cfg.Worker.DownloadOverdriveTimeout, "worker.downloadOverdriveTimeout", cfg.Worker.DownloadOverdriveTimeout, "timeout applied to slab downloads that decides when we start overdriving")
flag.Uint64Var(&cfg.Worker.UploadMaxMemory, "worker.uploadMaxMemory", cfg.Worker.UploadMaxMemory, "maximum amount of ram the worker allocates for slabs when uploading - can be overwritten using the RENTERD_WORKER_UPLOAD_MAX_MEMORY environment variable")
flag.Uint64Var(&cfg.Worker.UploadMaxOverdrive, "worker.uploadMaxOverdrive", cfg.Worker.UploadMaxOverdrive, "maximum number of active overdrive workers when uploading a slab")
flag.DurationVar(&cfg.Worker.UploadOverdriveTimeout, "worker.uploadOverdriveTimeout", cfg.Worker.UploadOverdriveTimeout, "timeout applied to slab uploads that decides when we start overdriving")
flag.BoolVar(&cfg.Worker.Enabled, "worker.enabled", cfg.Worker.Enabled, "enable/disable creating a worker - can be overwritten using the RENTERD_WORKER_ENABLED environment variable")
Expand Down Expand Up @@ -315,6 +317,7 @@ func main() {
parseEnvVar("RENTERD_WORKER_ENABLED", &cfg.Worker.Enabled)
parseEnvVar("RENTERD_WORKER_ID", &cfg.Worker.ID)
parseEnvVar("RENTERD_WORKER_UNAUTHENTICATED_DOWNLOADS", &cfg.Worker.AllowUnauthenticatedDownloads)
parseEnvVar("RENTERD_WORKER_UPLOAD_MAX_MEMORY", &cfg.Worker.UploadMaxMemory)

parseEnvVar("RENTERD_AUTOPILOT_ENABLED", &cfg.Autopilot.Enabled)
parseEnvVar("RENTERD_AUTOPILOT_REVISION_BROADCAST_INTERVAL", &cfg.Autopilot.RevisionBroadcastInterval)
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type (
DownloadOverdriveTimeout time.Duration `yaml:"downloadOverdriveTimeout"`
UploadOverdriveTimeout time.Duration `yaml:"uploadOverdriveTimeout"`
DownloadMaxOverdrive uint64 `yaml:"downloadMaxOverdrive"`
UploadMaxMemory uint64 `yaml:"uploadMaxMemory"`
UploadMaxOverdrive uint64 `yaml:"uploadMaxOverdrive"`
AllowUnauthenticatedDownloads bool `yaml:"allowUnauthenticatedDownloads"`
}
Expand Down
2 changes: 1 addition & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht

func NewWorker(cfg config.Worker, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) {
workerKey := blake2b.Sum256(append([]byte("worker"), seed...))
w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l)
w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxMemory, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l)
if err != nil {
return nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions internal/testing/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ func testWorkerCfg() config.Worker {
BusFlushInterval: testBusFlushInterval,
DownloadOverdriveTimeout: 500 * time.Millisecond,
UploadOverdriveTimeout: 500 * time.Millisecond,
UploadMaxMemory: 1 << 28, // 256 MiB
UploadMaxOverdrive: 5,
}
}
Expand Down
6 changes: 6 additions & 0 deletions worker/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ func (c *Client) ID(ctx context.Context) (id string, err error) {
return
}

// Memory requests the /memory endpoint.
func (c *Client) Memory(ctx context.Context) (resp api.MemoryResponse, err error) {
err = c.c.WithContext(ctx).GET("/memory", &resp)
return
}

// MigrateSlab migrates the specified slab.
func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) (res api.MigrateSlabResponse, err error) {
values := make(url.Values)
Expand Down
113 changes: 113 additions & 0 deletions worker/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package worker

import (
"context"
"fmt"
"sync"

"go.sia.tech/renterd/api"
"go.uber.org/zap"
)

type (
// memoryManager helps regulate processes that use a lot of memory. Such as
// uploads and downloads.
memoryManager struct {
totalAvailable uint64
logger *zap.SugaredLogger

mu sync.Mutex
sigNewMem sync.Cond
available uint64
}

acquiredMemory struct {
mm *memoryManager

mu sync.Mutex
remaining uint64
}
)

func newMemoryManager(logger *zap.SugaredLogger, maxMemory uint64) (*memoryManager, error) {
if maxMemory == 0 {
return nil, fmt.Errorf("maxMemory cannot be 0")
}
mm := &memoryManager{
logger: logger,
totalAvailable: maxMemory,
}
mm.available = mm.totalAvailable
mm.sigNewMem = *sync.NewCond(&mm.mu)
return mm, nil
}

func (mm *memoryManager) Status() api.MemoryStatus {
mm.mu.Lock()
defer mm.mu.Unlock()
return api.MemoryStatus{
Available: mm.available,
Total: mm.totalAvailable,
}
}

func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) *acquiredMemory {
if amt == 0 {
mm.logger.Fatal("cannot acquire 0 memory")
} else if mm.totalAvailable < amt {
mm.logger.Errorf("cannot acquire %v memory with only %v available", amt, mm.totalAvailable)
return nil
}
// block until enough memory is available
mm.sigNewMem.L.Lock()
for mm.available < amt {
mm.sigNewMem.Wait()

// check if the context was canceled in the meantime
select {
case <-ctx.Done():
mm.sigNewMem.L.Unlock()
return nil
default:
}
}
mm.available -= amt
mm.sigNewMem.L.Unlock()

mm.sigNewMem.Signal() // wake next goroutine
return &acquiredMemory{
mm: mm,
remaining: amt,
}
}

// release returns all the remaining memory to the memory manager. Should always
// be called on every acquiredMemory when done using it.
func (am *acquiredMemory) Release() {
am.mm.sigNewMem.L.Lock()
am.mm.available += am.remaining
am.mm.sigNewMem.L.Unlock()

am.mu.Lock()
am.remaining = 0
am.mu.Unlock()

am.mm.sigNewMem.Signal() // wake next goroutine
}

// ReleaseSome releases some of the remaining memory to the memory manager.
// Panics if more memory is released than was acquired.
func (am *acquiredMemory) ReleaseSome(amt uint64) {
am.mm.sigNewMem.L.Lock()
if amt > am.remaining {
panic("releasing more memory than remaining")
}
am.mm.available += amt
am.mm.sigNewMem.L.Unlock()

am.mu.Lock()
am.remaining -= amt
am.mu.Unlock()

am.mm.sigNewMem.Signal() // wake next goroutine
}
10 changes: 9 additions & 1 deletion worker/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
Expand Down Expand Up @@ -82,6 +83,13 @@ SHARDS:
return 0, false, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-missingShards, int(s.MinShards))
}

// acquire memory for the migration
mem := u.mm.AcquireMemory(ctx, uint64(len(shardIndices))*rhpv2.SectorSize)
if mem == nil {
return 0, false, fmt.Errorf("failed to acquire memory for migration")
}
defer mem.Release()

// download the slab
shards, surchargeApplied, err := d.DownloadSlab(ctx, *s, dlContracts)
if err != nil {
Expand All @@ -105,7 +113,7 @@ SHARDS:
}

// migrate the shards
uploaded, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload)
uploaded, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload, mem)
if err != nil {
return 0, surchargeApplied, fmt.Errorf("failed to upload slab for migration: %w", err)
}
Expand Down
Loading

0 comments on commit a4d7e01

Please sign in to comment.