diff --git a/worker/memory.go b/worker/memory.go index 8b1c7cb5e..1dbd680ec 100644 --- a/worker/memory.go +++ b/worker/memory.go @@ -151,7 +151,6 @@ func (lmm *limitMemoryManager) AcquireMemory(ctx context.Context, amt uint64) Me childMem.Release() return nil } - return &limitAcquiredMemory{ child: childMem, parent: parentMem, diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 80c1d5443..0720565db 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -322,19 +322,23 @@ type ( memoryManagerMock struct{ memBlockChan chan struct{} } ) +func newMemoryManagerMock() *memoryManagerMock { + mm := &memoryManagerMock{memBlockChan: make(chan struct{})} + close(mm.memBlockChan) + return mm +} + func (m *memoryMock) Release() {} func (m *memoryMock) ReleaseSome(uint64) {} func (mm *memoryManagerMock) Limit(amt uint64) (MemoryManager, error) { - return &memoryManagerMock{}, nil + return mm, nil } func (mm *memoryManagerMock) Status() api.MemoryStatus { return api.MemoryStatus{} } func (mm *memoryManagerMock) AcquireMemory(ctx context.Context, amt uint64) Memory { - if mm.memBlockChan != nil { - <-mm.memBlockChan - } + <-mm.memBlockChan return &memoryMock{} } diff --git a/worker/upload_test.go b/worker/upload_test.go index e8953db37..cc0996519 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -441,8 +441,6 @@ func TestUploadRegression(t *testing.T) { // convenience variables os := w.os - mm := w.ulmm - ul := w.uploadManager dl := w.downloadManager // create test data @@ -455,21 +453,21 @@ func TestUploadRegression(t *testing.T) { params := testParameters(t.Name()) // make sure the memory manager blocks - mm.memBlockChan = make(chan struct{}) + unblock := w.blockUploads() // upload data ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - _, _, err := ul.Upload(ctx, bytes.NewReader(data), w.contracts(), params, lockingPriorityUpload) + _, err := w.upload(ctx, bytes.NewReader(data), w.contracts(), params) if !errors.Is(err, errUploadInterrupted) { t.Fatal(err) } // unblock the memory manager - close(mm.memBlockChan) + unblock() // upload data - _, _, err = ul.Upload(context.Background(), bytes.NewReader(data), w.contracts(), params, lockingPriorityUpload) + _, err = w.upload(context.Background(), bytes.NewReader(data), w.contracts(), params) if err != nil { t.Fatal(err) } diff --git a/worker/worker_test.go b/worker/worker_test.go index baa47db5b..c1a19aed1 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -37,8 +37,8 @@ func newTestWorker(t *testing.T) *testWorker { // create worker dependencies b := newBusMock(cs, hs, os) - dlmm := &memoryManagerMock{} - ulmm := &memoryManagerMock{} + dlmm := newMemoryManagerMock() + ulmm := newMemoryManagerMock() // create worker w, err := New(blake2b.Sum256([]byte("testwork")), "test", b, time.Second, time.Second, time.Second, time.Second, 0, 0, 1, 1, false, zap.NewNop()) @@ -60,8 +60,8 @@ func newTestWorker(t *testing.T) *testWorker { cs, os, hs, - ulmm, dlmm, + ulmm, hm, } } @@ -81,6 +81,18 @@ func (w *testWorker) addHost() *testHost { return host } +func (w *testWorker) blockUploads() func() { + select { + case <-w.ulmm.memBlockChan: + case <-time.After(time.Second): + w.t.Fatal("already blocking") + } + + blockChan := make(chan struct{}) + w.ulmm.memBlockChan = blockChan + return func() { close(blockChan) } +} + func (w *testWorker) contracts() []api.ContractMetadata { metadatas, err := w.cs.Contracts(context.Background(), api.ContractsOpts{}) if err != nil {