From bbb0590237c9f1393d953c4724051c29000578f6 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 13 Feb 2024 11:44:56 +0100 Subject: [PATCH 1/3] worker: fail upload/download request if uploader/downloader was stopped --- worker/downloader.go | 15 +++++++++++++-- worker/uploader.go | 16 ++++++++++++++-- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/worker/downloader.go b/worker/downloader.go index 30c855d80..4a50e75f5 100644 --- a/worker/downloader.go +++ b/worker/downloader.go @@ -20,7 +20,6 @@ const ( type ( downloader struct { - hk types.PublicKey host Host statsDownloadSpeedBytesPerMS *stats.DataPoints // keep track of this separately for stats (no decay is applied) @@ -33,6 +32,7 @@ type ( consecutiveFailures uint64 numDownloads uint64 queue []*sectorDownloadReq + stopped bool } ) @@ -55,6 +55,10 @@ func (d *downloader) PublicKey() types.PublicKey { } func (d *downloader) Stop() { + d.mu.Lock() + d.stopped = true + d.mu.Unlock() + for { download := d.pop() if download == nil { @@ -80,8 +84,15 @@ func (d *downloader) fillBatch() (batch []*sectorDownloadReq) { } func (d *downloader) enqueue(download *sectorDownloadReq) { - // enqueue the job d.mu.Lock() + // check for stopped + if d.stopped { + d.mu.Unlock() + go download.fail(errors.New("downloader stopped")) // don't block the caller + return + } + + // enqueue the job d.queue = append(d.queue, download) d.mu.Unlock() diff --git a/worker/uploader.go b/worker/uploader.go index 3ec88c6fa..e20c4dee4 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -36,6 +36,7 @@ type ( fcid types.FileContractID host Host queue []*sectorUploadReq + stopped bool // stats related field consecutiveFailures uint64 @@ -136,6 +137,10 @@ outer: } func (u *uploader) Stop(err error) { + u.mu.Lock() + u.stopped = true + u.mu.Unlock() + for { upload := u.pop() if upload == nil { @@ -148,12 +153,19 @@ func (u *uploader) Stop(err error) { } func (u *uploader) enqueue(req *sectorUploadReq) { + u.mu.Lock() + // check for stopped + if u.stopped { + u.mu.Unlock() + go req.fail(errors.New("uploader stopped")) // don't block the caller + return + } + // decorate the request - req.fcid = u.ContractID() + req.fcid = u.fcid req.hk = u.hk // enqueue the request - u.mu.Lock() u.queue = append(u.queue, req) u.mu.Unlock() From 49475fa38b57ce4d2df702c9731acc07264b79dd Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 13 Feb 2024 12:01:08 +0100 Subject: [PATCH 2/3] worker: add tests --- worker/downloader.go | 8 ++++++-- worker/downloader_test.go | 32 ++++++++++++++++++++++++++++++++ worker/uploader.go | 6 +++++- worker/uploader_test.go | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 worker/downloader_test.go create mode 100644 worker/uploader_test.go diff --git a/worker/downloader.go b/worker/downloader.go index 4a50e75f5..24be245fc 100644 --- a/worker/downloader.go +++ b/worker/downloader.go @@ -18,6 +18,10 @@ const ( maxConcurrentSectorsPerHost = 3 ) +var ( + errDownloaderStopped = errors.New("downloader was stopped") +) + type ( downloader struct { host Host @@ -65,7 +69,7 @@ func (d *downloader) Stop() { break } if !download.done() { - download.fail(errors.New("downloader stopped")) + download.fail(errDownloaderStopped) } } } @@ -88,7 +92,7 @@ func (d *downloader) enqueue(download *sectorDownloadReq) { // check for stopped if d.stopped { d.mu.Unlock() - go download.fail(errors.New("downloader stopped")) // don't block the caller + go download.fail(errDownloaderStopped) // don't block the caller return } diff --git a/worker/downloader_test.go b/worker/downloader_test.go new file mode 100644 index 000000000..0be4bc701 --- /dev/null +++ b/worker/downloader_test.go @@ -0,0 +1,32 @@ +package worker + +import ( + "errors" + "testing" + "time" +) + +func TestDownloaderStopped(t *testing.T) { + w := newMockWorker() + h := w.addHost() + w.dl.refreshDownloaders(w.contracts()) + + dl := w.dl.downloaders[h.PublicKey()] + dl.Stop() + + req := sectorDownloadReq{ + resps: §orResponses{ + c: make(chan struct{}), + }, + } + dl.enqueue(&req) + + select { + case <-req.resps.c: + if err := req.resps.responses[0].err; !errors.Is(err, errDownloaderStopped) { + t.Fatal("unexpected error response", err) + } + case <-time.After(time.Second): + t.Fatal("no response") + } +} diff --git a/worker/uploader.go b/worker/uploader.go index e20c4dee4..dcff27eaf 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -19,6 +19,10 @@ const ( sectorUploadTimeout = 60 * time.Second ) +var ( + errUploaderStopped = errors.New("uploader was stopped") +) + type ( uploader struct { os ObjectStore @@ -157,7 +161,7 @@ func (u *uploader) enqueue(req *sectorUploadReq) { // check for stopped if u.stopped { u.mu.Unlock() - go req.fail(errors.New("uploader stopped")) // don't block the caller + go req.fail(errUploaderStopped) // don't block the caller return } diff --git a/worker/uploader_test.go b/worker/uploader_test.go new file mode 100644 index 000000000..3afba85cb --- /dev/null +++ b/worker/uploader_test.go @@ -0,0 +1,32 @@ +package worker + +import ( + "context" + "errors" + "testing" + "time" +) + +func TestUploaderStopped(t *testing.T) { + w := newMockWorker() + w.addHost() + w.ul.refreshUploaders(w.contracts(), 1) + + ul := w.ul.uploaders[0] + ul.Stop(errors.New("test")) + + req := sectorUploadReq{ + responseChan: make(chan sectorUploadResp), + sector: §orUpload{ctx: context.Background()}, + } + ul.enqueue(&req) + + select { + case res := <-req.responseChan: + if !errors.Is(res.err, errUploaderStopped) { + t.Fatal("expected error response") + } + case <-time.After(time.Second): + t.Fatal("no response") + } +} From 645f93a186645ef8562da5efb1d443b961d49e5b Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 13 Feb 2024 12:11:10 +0100 Subject: [PATCH 3/3] worker: increase timeout in tests for CI --- worker/downloader_test.go | 2 +- worker/uploader_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/downloader_test.go b/worker/downloader_test.go index 0be4bc701..357fc2ee8 100644 --- a/worker/downloader_test.go +++ b/worker/downloader_test.go @@ -26,7 +26,7 @@ func TestDownloaderStopped(t *testing.T) { if err := req.resps.responses[0].err; !errors.Is(err, errDownloaderStopped) { t.Fatal("unexpected error response", err) } - case <-time.After(time.Second): + case <-time.After(10 * time.Second): t.Fatal("no response") } } diff --git a/worker/uploader_test.go b/worker/uploader_test.go index 3afba85cb..7217cbaab 100644 --- a/worker/uploader_test.go +++ b/worker/uploader_test.go @@ -26,7 +26,7 @@ func TestUploaderStopped(t *testing.T) { if !errors.Is(res.err, errUploaderStopped) { t.Fatal("expected error response") } - case <-time.After(time.Second): + case <-time.After(10 * time.Second): t.Fatal("no response") } }