Skip to content

Commit

Permalink
Extend the worker config with worker.ServiceAddress (#1324)
Browse files Browse the repository at this point in the history
The worker cache requires setting up the worker in a way we did not have
to do before, namely registering webhooks with the bus so the worker
gets notified when certain events take place on the bus. In single node
setups or when you're on localhost everything works fine but in a
distributed setup we are missing a way to define where the bus can reach
the worker API. I came up with `serviceAddress` for now but I've gone
back and forth quite a bit. I like `serviceAddress` because it ties in
closely with docker services, I think we should call it
`something-address` to be consistent with the rest of the config.

I verified it using the following `docker-compose` file:

```
version: "3.9"

services:
  bus:
    build:
      context: .
      dockerfile: ./docker/Dockerfile
    container_name: bus
    ports:
      - "9981:9980"
    environment:
      - RENTERD_AUTOPILOT_ENABLED=false
      - RENTERD_WORKER_ENABLED=false
      - RENTERD_WORKER_REMOTE_ADDRS=http://worker:9980/api/worker
      - RENTERD_WORKER_API_PASSWORD=test
      - RENTERD_API_PASSWORD=test
      - RENTERD_SEED=[seed here]

  worker:
    build:
      context: .
      dockerfile: ./docker/Dockerfile
    container_name: worker
    ports:
      - "9982:9980"
    environment:
      - RENTERD_AUTOPILOT_ENABLED=false
      - RENTERD_BUS_REMOTE_ADDR=http://bus:9980/api/bus
      - RENTERD_BUS_API_PASSWORD=test
      - RENTERD_WORKER_API_PASSWORD=test
      - RENTERD_WORKER_SERVICE_ADDR=http://worker:9980
      - RENTERD_API_PASSWORD=test
      - RENTERD_SEED=[seed here]
    healthcheck:
      test: ["CMD-SHELL", "curl -f http://localhost:9980/api/worker || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 5

``` 

Fixes #1302

---------

Co-authored-by: Christopher Schinnerl <[email protected]>
  • Loading branch information
peterjan and ChrisSchinnerl authored Jun 24, 2024
1 parent 281f92f commit 5cd3cb3
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ overview of all settings configurable through the CLI.
| `Worker.UploadOverdriveTimeout` | Timeout for overdriving slab uploads | `3s` | `--worker.uploadOverdriveTimeout` | - | `worker.uploadOverdriveTimeout` |
| `Worker.Enabled` | Enables/disables worker | `true` | `--worker.enabled` | `RENTERD_WORKER_ENABLED` | `worker.enabled` |
| `Worker.AllowUnauthenticatedDownloads` | Allows unauthenticated downloads | - | `--worker.unauthenticatedDownloads` | `RENTERD_WORKER_UNAUTHENTICATED_DOWNLOADS` | `worker.allowUnauthenticatedDownloads` |
| `Worker.ExternalAddress` | Address of the worker on the network, only necessary when the bus is remote | - | - | `RENTERD_WORKER_EXTERNAL_ADDR` | `worker.externalAddress` |
| `Autopilot.AccountsRefillInterval` | Interval for refilling workers' account balances | `24h` | `--autopilot.accountRefillInterval` | - | `autopilot.accountsRefillInterval` |
| `Autopilot.Heartbeat` | Interval for autopilot loop execution | `30m` | `--autopilot.heartbeat` | - | `autopilot.heartbeat` |
| `Autopilot.MigrationHealthCutoff` | Threshold for migrating slabs based on health | `0.75` | `--autopilot.migrationHealthCutoff` | - | `autopilot.migrationHealthCutoff` |
Expand Down
18 changes: 15 additions & 3 deletions cmd/renterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ func main() {
flag.DurationVar(&cfg.Worker.UploadOverdriveTimeout, "worker.uploadOverdriveTimeout", cfg.Worker.UploadOverdriveTimeout, "Timeout for overdriving slab uploads")
flag.BoolVar(&cfg.Worker.Enabled, "worker.enabled", cfg.Worker.Enabled, "Enables/disables worker (overrides with RENTERD_WORKER_ENABLED)")
flag.BoolVar(&cfg.Worker.AllowUnauthenticatedDownloads, "worker.unauthenticatedDownloads", cfg.Worker.AllowUnauthenticatedDownloads, "Allows unauthenticated downloads (overrides with RENTERD_WORKER_UNAUTHENTICATED_DOWNLOADS)")
flag.StringVar(&cfg.Worker.ExternalAddress, "worker.externalAddress", cfg.Worker.ExternalAddress, "Address of the worker on the network, only necessary when the bus is remote (overrides with RENTERD_WORKER_EXTERNAL_ADDR)")

// autopilot
flag.DurationVar(&cfg.Autopilot.AccountsRefillInterval, "autopilot.accountRefillInterval", cfg.Autopilot.AccountsRefillInterval, "Interval for refilling workers' account balances")
Expand Down Expand Up @@ -365,6 +366,7 @@ func main() {
parseEnvVar("RENTERD_WORKER_UNAUTHENTICATED_DOWNLOADS", &cfg.Worker.AllowUnauthenticatedDownloads)
parseEnvVar("RENTERD_WORKER_DOWNLOAD_MAX_MEMORY", &cfg.Worker.DownloadMaxMemory)
parseEnvVar("RENTERD_WORKER_UPLOAD_MAX_MEMORY", &cfg.Worker.UploadMaxMemory)
parseEnvVar("RENTERD_WORKER_EXTERNAL_ADDR", &cfg.Worker.ExternalAddress)

parseEnvVar("RENTERD_AUTOPILOT_ENABLED", &cfg.Autopilot.Enabled)
parseEnvVar("RENTERD_AUTOPILOT_REVISION_BROADCAST_INTERVAL", &cfg.Autopilot.RevisionBroadcastInterval)
Expand Down Expand Up @@ -462,8 +464,12 @@ func main() {
}
var shutdownFns []shutdownFnEntry

if cfg.Bus.RemoteAddr != "" && len(cfg.Worker.Remotes) != 0 && !cfg.Autopilot.Enabled {
logger.Fatal("remote bus, remote worker, and no autopilot -- nothing to do!")
if cfg.Bus.RemoteAddr != "" {
if len(cfg.Worker.Remotes) != 0 && !cfg.Autopilot.Enabled {
logger.Fatal("remote bus, remote worker, and no autopilot -- nothing to do!")
} else if cfg.Worker.ExternalAddress == "" {
logger.Fatal("if the bus is remote, the worker needs to be able to tell it where to find its API, this can be configured using worker.externalAddress")
}
}
if len(cfg.Worker.Remotes) == 0 && !cfg.Worker.Enabled && cfg.Autopilot.Enabled {
logger.Fatal("can't enable autopilot without providing either workers to connect to or creating a worker")
Expand Down Expand Up @@ -534,8 +540,14 @@ func main() {
if err != nil {
logger.Fatal("failed to create worker: " + err.Error())
}
var workerExternAddr string
if cfg.Bus.RemoteAddr != "" {
workerExternAddr = cfg.Worker.ExternalAddress + "/api/worker"
} else {
workerExternAddr = workerAddr
}
setupWorkerFn = func(ctx context.Context) error {
return setupFn(ctx, workerAddr, cfg.HTTP.Password)
return setupFn(ctx, workerExternAddr, cfg.HTTP.Password)
}
shutdownFns = append(shutdownFns, shutdownFnEntry{
name: "Worker",
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type (
UploadMaxMemory uint64 `yaml:"uploadMaxMemory,omitempty"`
UploadMaxOverdrive uint64 `yaml:"uploadMaxOverdrive,omitempty"`
AllowUnauthenticatedDownloads bool `yaml:"allowUnauthenticatedDownloads,omitempty"`
ExternalAddress string `yaml:"externalAddress,omitempty"`
}

// Autopilot contains the configuration for an autopilot.
Expand Down
9 changes: 6 additions & 3 deletions worker/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2
return err
}

var refund types.Currency
payment := rhpv3.PayByEphemeralAccount(h.acc.id, cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey)
cost, refund, err = RPCReadSector(ctx, t, w, hpt, &payment, offset, length, root)
cost, refund, err := RPCReadSector(ctx, t, w, hpt, &payment, offset, length, root)
if err != nil {
return err
}

amount = cost.Sub(refund)
return err
return nil
})
return
})
Expand Down

0 comments on commit 5cd3cb3

Please sign in to comment.