Skip to content

Commit

Permalink
worker: extend test worker with a way to block uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Feb 26, 2024
1 parent 8baca5b commit 145b505
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 14 deletions.
1 change: 0 additions & 1 deletion worker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (lmm *limitMemoryManager) AcquireMemory(ctx context.Context, amt uint64) Me
childMem.Release()
return nil
}

return &limitAcquiredMemory{
child: childMem,
parent: parentMem,
Expand Down
12 changes: 8 additions & 4 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,23 @@ type (
memoryManagerMock struct{ memBlockChan chan struct{} }
)

func newMemoryManagerMock() *memoryManagerMock {
mm := &memoryManagerMock{memBlockChan: make(chan struct{})}
close(mm.memBlockChan)
return mm
}

func (m *memoryMock) Release() {}
func (m *memoryMock) ReleaseSome(uint64) {}

func (mm *memoryManagerMock) Limit(amt uint64) (MemoryManager, error) {
return &memoryManagerMock{}, nil
return mm, nil
}

func (mm *memoryManagerMock) Status() api.MemoryStatus { return api.MemoryStatus{} }

func (mm *memoryManagerMock) AcquireMemory(ctx context.Context, amt uint64) Memory {
if mm.memBlockChan != nil {
<-mm.memBlockChan
}
<-mm.memBlockChan
return &memoryMock{}
}

Expand Down
10 changes: 4 additions & 6 deletions worker/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,6 @@ func TestUploadRegression(t *testing.T) {

// convenience variables
os := w.os
mm := w.ulmm
ul := w.uploadManager
dl := w.downloadManager

// create test data
Expand All @@ -455,21 +453,21 @@ func TestUploadRegression(t *testing.T) {
params := testParameters(t.Name())

// make sure the memory manager blocks
mm.memBlockChan = make(chan struct{})
unblock := w.blockUploads()

// upload data
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, _, err := ul.Upload(ctx, bytes.NewReader(data), w.contracts(), params, lockingPriorityUpload)
_, err := w.upload(ctx, bytes.NewReader(data), w.contracts(), params)
if !errors.Is(err, errUploadInterrupted) {
t.Fatal(err)
}

// unblock the memory manager
close(mm.memBlockChan)
unblock()

// upload data
_, _, err = ul.Upload(context.Background(), bytes.NewReader(data), w.contracts(), params, lockingPriorityUpload)
_, err = w.upload(context.Background(), bytes.NewReader(data), w.contracts(), params)
if err != nil {
t.Fatal(err)
}
Expand Down
18 changes: 15 additions & 3 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func newTestWorker(t *testing.T) *testWorker {

// create worker dependencies
b := newBusMock(cs, hs, os)
dlmm := &memoryManagerMock{}
ulmm := &memoryManagerMock{}
dlmm := newMemoryManagerMock()
ulmm := newMemoryManagerMock()

// create worker
w, err := New(blake2b.Sum256([]byte("testwork")), "test", b, time.Second, time.Second, time.Second, time.Second, 0, 0, 1, 1, false, zap.NewNop())
Expand All @@ -60,8 +60,8 @@ func newTestWorker(t *testing.T) *testWorker {
cs,
os,
hs,
ulmm,
dlmm,
ulmm,
hm,
}
}
Expand All @@ -81,6 +81,18 @@ func (w *testWorker) addHost() *testHost {
return host
}

func (w *testWorker) blockUploads() func() {
select {
case <-w.ulmm.memBlockChan:
case <-time.After(time.Second):
w.t.Fatal("already blocking")
}

blockChan := make(chan struct{})
w.ulmm.memBlockChan = blockChan
return func() { close(blockChan) }
}

func (w *testWorker) contracts() []api.ContractMetadata {
metadatas, err := w.cs.Contracts(context.Background(), api.ContractsOpts{})
if err != nil {
Expand Down

0 comments on commit 145b505

Please sign in to comment.