diff --git a/eventkitd-bigquery/bigquery/batch_test.go b/eventkitd-bigquery/bigquery/batch_test.go index 0468889..9f340ee 100644 --- a/eventkitd-bigquery/bigquery/batch_test.go +++ b/eventkitd-bigquery/bigquery/batch_test.go @@ -1,7 +1,11 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + package bigquery import ( "context" + "sync" "testing" "time" @@ -24,21 +28,31 @@ func TestBatchQueue(t *testing.T) { }) } require.Eventually(t, func() bool { - return len(m.events) == 2 + return m.Len() == 2 }, 5*time.Second, 10*time.Millisecond) require.Len(t, m.events[0], 10) require.Len(t, m.events[1], 10) } type mockDestination struct { + mu sync.Mutex events [][]*eventkit.Event } func (m *mockDestination) Submit(event ...*eventkit.Event) { + m.mu.Lock() + defer m.mu.Unlock() m.events = append(m.events, event) } +func (m *mockDestination) Len() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.events) +} + func (m *mockDestination) Run(ctx context.Context) { + } var _ eventkit.Destination = &mockDestination{} diff --git a/eventkitd-bigquery/bigquery/parallel.go b/eventkitd-bigquery/bigquery/parallel.go new file mode 100644 index 0000000..7a11522 --- /dev/null +++ b/eventkitd-bigquery/bigquery/parallel.go @@ -0,0 +1,70 @@ +package bigquery + +import ( + "context" + "fmt" + "os" + + "golang.org/x/sync/errgroup" + + "storj.io/eventkit" +) + +// Parallel sends messages parallel from multiple goroutines. +type Parallel struct { + queue chan []*eventkit.Event + target func() (eventkit.Destination, error) + workers int + teardown chan struct{} +} + +// NewParallel creates a destination. It requires a way to create the worker destinations and the number of goroutines. +func NewParallel(target func() (eventkit.Destination, error), workers int) *Parallel { + return &Parallel{ + queue: make(chan []*eventkit.Event, workers), + teardown: make(chan struct{}), + target: target, + workers: workers, + } +} + +// Submit implements eventkit.Destination. +func (p *Parallel) Submit(events ...*eventkit.Event) { + select { + case p.queue <- events: + case <-p.teardown: + } + +} + +// Run implements eventkit.Destination. +func (p *Parallel) Run(ctx context.Context) { + w := errgroup.Group{} + for i := 0; i < p.workers; i++ { + dest, err := p.target() + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "WARNING: eventkit destination couldn't be created: %v", err) + continue + } + w.Go(func() error { + dest.Run(ctx) + return nil + }) + w.Go(func() error { + for { + select { + case events := <-p.queue: + dest.Submit(events...) + case <-ctx.Done(): + return nil + } + } + }) + } + _ = w.Wait() + close(p.teardown) + close(p.queue) + +} + +var _ eventkit.Destination = &Parallel{} diff --git a/eventkitd-bigquery/bigquery/parallel_test.go b/eventkitd-bigquery/bigquery/parallel_test.go new file mode 100644 index 0000000..5871d25 --- /dev/null +++ b/eventkitd-bigquery/bigquery/parallel_test.go @@ -0,0 +1,34 @@ +package bigquery + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "storj.io/eventkit" +) + +func TestParallel(t *testing.T) { + m := &mockDestination{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + queue := NewParallel(func() (eventkit.Destination, error) { + return m, nil + }, 10) + go func() { + queue.Run(ctx) + }() + for i := 0; i < 10000; i++ { + queue.Submit(&eventkit.Event{ + Name: "foobar", + }) + } + require.Eventually(t, func() bool { + return m.Len() == 10000 + }, 5*time.Second, 10*time.Millisecond) + require.Len(t, m.events[0], 1) + require.Len(t, m.events[1], 1) + +}