Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for ongoing uploads and downloads when shutting down the worker #882

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ func (ap *Autopilot) Run() error {
var forceScan bool
var launchAccountRefillsOnce sync.Once
for {
// check for shutdown right before starting a new iteration
select {
case <-ap.shutdownCtx.Done():
return nil
default:
}

ap.logger.Info("autopilot iteration starting")
tickerFired := make(chan struct{})
ap.workers.withWorker(func(w Worker) {
Expand All @@ -220,7 +227,7 @@ func (ap *Autopilot) Run() error {
close(tickerFired)
return
}
ap.logger.Error("autopilot stopped before consensus was synced")
ap.logger.Info("autopilot stopped before consensus was synced")
return
} else if blocked {
if scanning, _ := ap.s.Status(); !scanning {
Expand All @@ -234,7 +241,7 @@ func (ap *Autopilot) Run() error {
close(tickerFired)
return
}
ap.logger.Error("autopilot stopped before it was able to confirm it was configured in the bus")
ap.logger.Info("autopilot stopped before it was able to confirm it was configured in the bus")
return
}

Expand Down
52 changes: 38 additions & 14 deletions cmd/renterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -626,25 +627,48 @@ func main() {
logger.Fatal("Fatal autopilot error: " + err.Error())
}

// Give each service a fraction of the total shutdown timeout. One service
// timing out shouldn't prevent the others from attempting a shutdown.
timeout := cfg.ShutdownTimeout / time.Duration(len(shutdownFns))
shutdown := func(fn func(ctx context.Context) error) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return fn(ctx)
}

// Shut down the autopilot first, then the rest of the services in reverse order and then
// Define a shutdown function that updates the exit code after shutting down
// a service and logs the outcome
exitCode := 0
for i := len(shutdownFns) - 1; i >= 0; i-- {
if err := shutdown(shutdownFns[i].fn); err != nil {
logger.Sugar().Errorf("Failed to shut down %v: %v", shutdownFns[i].name, err)
shutdown := func(ctx context.Context, fn shutdownFn) {
logger.Sugar().Infof("Shutting down %v...", fn.name)
start := time.Now()
if err := fn.fn(ctx); errors.Is(err, worker.ErrShutdownTimedOut) {
logger.Sugar().Errorf("%v shutdown timed out after %v", fn.name, time.Since(start))
exitCode = 1
} else if err != nil {
logger.Sugar().Errorf("%v shutdown failed after %v with err: %v", fn.name, time.Since(start), err)
exitCode = 1
} else {
logger.Sugar().Infof("%v shut down successfully", shutdownFns[i].name)
logger.Sugar().Infof("%v shutdown successful after %v", fn.name, time.Since(start))
}
}

// Reserve a portion of the shutdown timeout to allow graceful shutdown of
// services after a potential timeout from a prior service that took too
// long to shut down. This way we allow all services to shut down
// gracefully.
reserved := (cfg.ShutdownTimeout / 5).Round(5 * time.Second)
ctx, cancel := context.WithTimeoutCause(context.Background(), cfg.ShutdownTimeout-reserved, worker.ErrShuttingDown)
defer cancel()

// Shut down the services in reverse order
for i := len(shutdownFns) - 1; i >= 0; i-- {
// use reserve context if necessary
select {
case <-ctx.Done():
if reserved == 0 {
logger.Sugar().Errorf("%v shutdown skipped, node shutdown exceeded %v", shutdownFns[i].name, cfg.ShutdownTimeout)
exitCode = 1
continue
}
ctx, _ = context.WithTimeoutCause(context.Background(), reserved, worker.ErrShuttingDown)
reserved = 0
default:
}
shutdown(ctx, shutdownFns[i])
}

logger.Info("Shutdown complete")
os.Exit(exitCode)
}
Expand Down
148 changes: 148 additions & 0 deletions internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.sia.tech/renterd/internal/test"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/worker"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"lukechampine.com/frand"
Expand Down Expand Up @@ -2362,6 +2363,11 @@ func TestBusRecordedMetrics(t *testing.T) {
t.Fatal("expected zero ListSpending")
}

// Shut down everything but the bus to avoid an NDF in the following assertion.
cluster.ShutdownS3(context.Background())
cluster.ShutdownWorker(context.Background())
cluster.ShutdownAutopilot(context.Background())

// Prune one of the metrics
if err := cluster.Bus.PruneMetrics(context.Background(), api.MetricContract, time.Now()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -2448,3 +2454,145 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) {
t.Fatal("unexpected data")
}
}

type blockedWriter struct {
blockChan chan struct{}
writingChan chan struct{}

once sync.Once

mu sync.Mutex
buffer *bytes.Buffer
}

func newBlockedWriter() *blockedWriter {
return &blockedWriter{
buffer: new(bytes.Buffer),
blockChan: make(chan struct{}),
writingChan: make(chan struct{}),
}
}

func (r *blockedWriter) Bytes() []byte {
r.mu.Lock()
defer r.mu.Unlock()
return r.buffer.Bytes()
}

func (r *blockedWriter) Write(p []byte) (n int, err error) {
r.once.Do(func() { close(r.writingChan) })

<-r.blockChan

r.mu.Lock()
defer r.mu.Unlock()
return r.buffer.Write(p)
}

func (r *blockedWriter) waitForWriteStart() {
<-r.writingChan
}

func (r *blockedWriter) unblock() {
close(r.blockChan)
}

func TestGracefulShutdown(t *testing.T) {
// create cluster
cluster := newTestCluster(t, testClusterOptions{hosts: test.RedundancySettings.TotalShards})
defer cluster.Shutdown()

// convenience variables
b := cluster.Bus
w := cluster.Worker
tt := cluster.tt

// shut down the autopilot, we don't need it
cluster.ShutdownAutopilot(context.Background())

// prepare an object to download
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader([]byte(t.Name())), api.DefaultBucketName, t.Name(), api.UploadObjectOptions{}))

// prepare both a reader and a writer that blocks until we unblock them
data := make([]byte, 128)
frand.Read(data)
br := newBlockedReader(data)
bw := newBlockedWriter()

// upload in separate thread
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if _, err := w.UploadObject(context.Background(), br, api.DefaultBucketName, t.Name()+"blocked", api.UploadObjectOptions{}); err != nil {
t.Error(err)
}
}()

// download in separate thread
wg.Add(1)
go func() {
defer wg.Done()
if err := w.DownloadObject(context.Background(), bw, api.DefaultBucketName, t.Name(), api.DownloadObjectOptions{}); err != nil {
t.Error(err)
}
}()

// wait until we are sure both requests are blocked
br.waitForReadStart()
bw.waitForWriteStart()

// shut the worker down
shutdownDone := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
cluster.ShutdownWorker(ctx)
close(shutdownDone)
cancel()
}()

// assert shutdown is blocked
select {
case <-shutdownDone:
tt.Fatal("shut down")
case <-time.After(time.Second):
}

// unblock the download and upload separately, this allows us to check
// for ErrShuttingDown explicitly, unblocking both at the same time
// would shut the worker down along with the server resulting in a race
// between either ErrShuttingDown or 'connection refused' errors
br.unblock()

// assert uploads after shutdown fail
_, err := w.UploadObject(context.Background(), br, api.DefaultBucketName, t.Name()+"blocked", api.UploadObjectOptions{})
if err != nil && strings.Contains(err.Error(), worker.ErrShuttingDown.Error()) {
t.Error("new uploads should fail, err:", err)
}

// assert downloads after shutdown fail
var buf bytes.Buffer
err = w.DownloadObject(context.Background(), &buf, api.DefaultBucketName, t.Name(), api.DownloadObjectOptions{})
if err != nil && strings.Contains(err.Error(), worker.ErrShuttingDown.Error()) {
t.Error("new downloads should fail, err:", err)
}

// unblock the upload
bw.unblock()

// wait for all goroutines to finish
wg.Wait()

// check the download succeeded
if string(bw.Bytes()) != t.Name() {
t.Fatal("data mismatch")
}

// check the upload succeeded, we can use the bus for that
_, err = b.Object(context.Background(), api.DefaultBucketName, t.Name()+"blocked", api.GetObjectOptions{})
if err != nil {
t.Fatal(err)
}
}
25 changes: 21 additions & 4 deletions internal/test/e2e/uploads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sync"
"testing"
"time"

Expand All @@ -17,20 +18,28 @@ import (
type blockedReader struct {
remaining int
data *bytes.Buffer
readChan chan struct{}
blockChan chan struct{}

blockChan chan struct{}
readChan chan struct{}
readingChan chan struct{}

once sync.Once
}

func newBlockedReader(data []byte) *blockedReader {
return &blockedReader{
remaining: len(data),
data: bytes.NewBuffer(data),
blockChan: make(chan struct{}),
readChan: make(chan struct{}),

blockChan: make(chan struct{}),
readChan: make(chan struct{}),
readingChan: make(chan struct{}),
}
}

func (r *blockedReader) Read(buf []byte) (n int, err error) {
r.once.Do(func() { close(r.readingChan) })

select {
case <-r.readChan:
<-r.blockChan
Expand All @@ -44,6 +53,14 @@ func (r *blockedReader) Read(buf []byte) (n int, err error) {
return
}

func (r *blockedReader) waitForReadStart() {
<-r.readingChan
}

func (r *blockedReader) unblock() {
close(r.blockChan)
}

func TestUploadingSectorsCache(t *testing.T) {
if testing.Short() {
t.SkipNow()
Expand Down
Loading
Loading