From cb689609bc6c4e25091b7174a36344d2c3e7c13e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Wed, 21 Feb 2024 14:18:51 +0100 Subject: [PATCH] bigquery: implement destination with BatchQueue Change-Id: I7b40054d25bead1727e4a02edc113ff6811047ca --- eventkitd-bigquery/bigquery/batch.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/eventkitd-bigquery/bigquery/batch.go b/eventkitd-bigquery/bigquery/batch.go index 0b72f8e..c6a7a1b 100644 --- a/eventkitd-bigquery/bigquery/batch.go +++ b/eventkitd-bigquery/bigquery/batch.go @@ -3,7 +3,6 @@ package bigquery import ( "context" "sync" - "sync/atomic" "time" "golang.org/x/sync/errgroup" @@ -20,9 +19,10 @@ type BatchQueue struct { target eventkit.Destination mu sync.Mutex events []*eventkit.Event - droppedEvents atomic.Int64 } +var _ eventkit.Destination = &BatchQueue{} + // NewBatchQueue creates a new batchQueue. It sends out the received events in batch. Either after the flushInterval is // expired or when there are more than batchSize element in the queue. func NewBatchQueue(target eventkit.Destination, queueSize int, batchSize int, flushInterval time.Duration) *BatchQueue { @@ -60,10 +60,6 @@ func (c *BatchQueue) Run(ctx context.Context) { } for { - if drops := c.droppedEvents.Load(); drops > 0 { - mon.Counter("dropped_events").Inc(drops) - c.droppedEvents.Add(-drops) - } select { case em := <-c.submitQueue: @@ -97,11 +93,13 @@ func (c *BatchQueue) addEvent(ev *eventkit.Event) (full bool) { } // Submit implements Destination. -func (c *BatchQueue) Submit(event *eventkit.Event) { - select { - case c.submitQueue <- event: - return - default: - c.droppedEvents.Add(1) +func (c *BatchQueue) Submit(events ...*eventkit.Event) { + defer mon.Task()(nil)(nil) + for _, e := range events { + select { + case c.submitQueue <- e: + default: + mon.Counter("dropped_events").Inc(1) + } } }