-
Notifications
You must be signed in to change notification settings - Fork 55
/
buffer_internal_test.go
123 lines (100 loc) · 2.59 KB
/
buffer_internal_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package relayer
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/omni-network/omni/lib/xchain"
fuzz "github.com/google/gofuzz"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func Test_activeBuffer_AddInput(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
limit := int64(5)
sender := &mockBufSender{}
buffer := newActiveBuffer("test", limit, sender.Send)
// Have a reader ready as we are unbuffered and blocking
go func() {
for range buffer.buffer {
}
}()
err := buffer.AddInput(ctx, xchain.Submission{})
require.NoError(t, err)
select {
case <-ctx.Done():
t.Errorf("AddInput is blocking and should not have")
default:
}
}
type mockBufSender struct {
sendChan chan xchain.Submission
}
func newMockSender() *mockBufSender {
return &mockBufSender{
sendChan: make(chan xchain.Submission),
}
}
func (m *mockBufSender) Send(_ context.Context, sub xchain.Submission) <-chan error {
// Simulate async send that returns success when MineNext is called below.
resp := make(chan error, 1)
go func() {
m.sendChan <- sub
resp <- nil
}()
return resp
}
func (m *mockBufSender) Next() xchain.Submission {
return <-m.sendChan
}
// Test_activeBuffer_Run tests that the buffer is blocking when the number of submissions is greater
// than the mempoolLimit.
func Test_activeBuffer_Run(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//
const (
memLimit = int64(5) // mempoolLimit
size = 10
)
sender := newMockSender()
buffer := newActiveBuffer("test", memLimit, sender.Send)
var input []xchain.Submission
fuzz.New().NilChance(0).NumElements(size, size).Fuzz(&input)
go func() {
err := buffer.Run(ctx)
assert.ErrorIs(t, err, context.Canceled)
}()
counter := new(atomic.Int64)
go func() {
for _, sub := range input {
err := buffer.AddInput(ctx, sub)
assert.NoError(t, err)
counter.Add(1)
}
}()
require.Eventuallyf(t,
func() bool {
return counter.Load() == memLimit+1
},
time.Second, time.Millisecond, "expected %d", memLimit+1,
)
// assert again that buf is blocking
require.Equal(t, memLimit+1, counter.Load())
// Retrieve output submissions
var output []xchain.Submission
for len(input) != len(output) {
output = append(output, sender.Next())
}
require.Eventuallyf(t,
func() bool {
return counter.Load() == int64(size)
},
time.Second, time.Millisecond, "expected %d", size,
)
// Assert equality of input and output submissions
require.Len(t, input, len(output))
}