From ddec937c1a40f6f63b2db41ec064807035dc80f7 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Tue, 5 Nov 2024 15:27:40 +0100 Subject: [PATCH] test(models): Cleanup tests, unexport stuff and unify naming in tests (#16116) --- models/buffer.go | 4 + models/buffer_disk.go | 4 + models/buffer_disk_test.go | 84 +-- models/buffer_mem.go | 64 +- models/buffer_mem_test.go | 34 +- models/buffer_suite_test.go | 1009 +++++++++++++++-------------- models/filter_test.go | 38 +- models/running_aggregator_test.go | 38 +- models/running_input_test.go | 80 +-- models/running_output.go | 4 + models/running_output_test.go | 159 ++--- models/running_processor_test.go | 117 ++-- 12 files changed, 841 insertions(+), 794 deletions(-) diff --git a/models/buffer.go b/models/buffer.go index 7f4d679f96fa9..d6f0949f346a3 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -35,7 +35,11 @@ type Buffer interface { // as unsent. Reject([]telegraf.Metric) + // Stats returns the buffer statistics such as rejected, dropped and accepred metrics Stats() BufferStats + + // Close finalizes the buffer and closes all open resources + Close() error } // BufferStats holds common metrics used for buffer implementations. diff --git a/models/buffer_disk.go b/models/buffer_disk.go index 2031e57399eab..09e338224762f 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -215,6 +215,10 @@ func (b *DiskBuffer) Stats() BufferStats { return b.BufferStats } +func (b *DiskBuffer) Close() error { + return b.file.Close() +} + func (b *DiskBuffer) resetBatch() { b.batchFirst = 0 b.batchSize = 0 diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go index d7d97be812bb1..3f04ef86d6246 100644 --- a/models/buffer_disk_test.go +++ b/models/buffer_disk_test.go @@ -1,7 +1,6 @@ package models import ( - "os" "path/filepath" "testing" "time" @@ -14,66 +13,37 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func newTestDiskBuffer(t testing.TB) Buffer { - path, err := os.MkdirTemp("", "*-buffer-test") - require.NoError(t, err) - return newTestDiskBufferWithPath(t, "test", path) -} +func TestDiskBufferRetainsTrackingInformation(t *testing.T) { + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + + var delivered int + mm, _ := metric.WithTracking(m, func(telegraf.DeliveryInfo) { delivered++ }) -func newTestDiskBufferWithPath(t testing.TB, name string, path string) Buffer { - t.Helper() - buf, err := NewBuffer(name, "123", "", 0, "disk", path) + buf, err := NewBuffer("test", "123", "", 0, "disk", t.TempDir()) require.NoError(t, err) buf.Stats().MetricsAdded.Set(0) buf.Stats().MetricsWritten.Set(0) buf.Stats().MetricsDropped.Set(0) - return buf -} + defer buf.Close() -func TestBuffer_RetainsTrackingInformation(t *testing.T) { - var delivered int - mm, _ := metric.WithTracking(Metric(), func(_ telegraf.DeliveryInfo) { - delivered++ - }) - b := newTestDiskBuffer(t) - b.Add(mm) - batch := b.Batch(1) - b.Accept(batch) + buf.Add(mm) + + batch := buf.Batch(1) + buf.Accept(batch) require.Equal(t, 1, delivered) } -func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) { - path, err := os.MkdirTemp("", "*-buffer-test") - require.NoError(t, err) - path = filepath.Join(path, "123") - walfile, err := wal.Open(path, nil) - require.NoError(t, err) - - tm, _ := metric.WithTracking(Metric(), func(_ telegraf.DeliveryInfo) {}) +func TestDiskBufferTrackingDroppedFromOldWal(t *testing.T) { + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + tm, _ := metric.WithTracking(m, func(telegraf.DeliveryInfo) {}) metrics := []telegraf.Metric{ // Basic metric with 1 field, 0 timestamp - Metric(), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)), // Basic metric with 1 field, different timestamp - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "value": 20.0, - }, - time.Now(), - ), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 20.0}, time.Now()), // Metric with a field - metric.New( - "cpu", - map[string]string{ - "x": "y", - }, - map[string]interface{}{ - "value": 18.0, - }, - time.Now(), - ), + metric.New("cpu", map[string]string{"x": "y"}, map[string]interface{}{"value": 18.0}, time.Now()), // Tracking metric tm, // Metric with lots of tag types @@ -95,15 +65,29 @@ func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) { // call manually so that we can properly use metric.ToBytes() without having initialized a buffer registerGob() + // Prefill the WAL file + path := t.TempDir() + walfile, err := wal.Open(filepath.Join(path, "123"), nil) + require.NoError(t, err) + defer walfile.Close() for i, m := range metrics { data, err := metric.ToBytes(m) require.NoError(t, err) require.NoError(t, walfile.Write(uint64(i+1), data)) } + walfile.Close() + + // Create a buffer + buf, err := NewBuffer("123", "123", "", 0, "disk", path) + require.NoError(t, err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + defer buf.Close() + + batch := buf.Batch(4) - b := newTestDiskBufferWithPath(t, filepath.Base(path), filepath.Dir(path)) - batch := b.Batch(4) - // expected skips the tracking metric + // Check that the tracking metric is skipped expected := []telegraf.Metric{ metrics[0], metrics[1], metrics[2], metrics[4], } diff --git a/models/buffer_mem.go b/models/buffer_mem.go index 88216c9f81ea7..7bba4744f4e07 100644 --- a/models/buffer_mem.go +++ b/models/buffer_mem.go @@ -36,36 +36,6 @@ func (b *MemoryBuffer) Len() int { return b.length() } -func (b *MemoryBuffer) length() int { - return min(b.size+b.batchSize, b.cap) -} - -func (b *MemoryBuffer) addMetric(m telegraf.Metric) int { - dropped := 0 - // Check if Buffer is full - if b.size == b.cap { - b.metricDropped(b.buf[b.last]) - dropped++ - - if b.batchSize > 0 { - b.batchSize-- - b.batchFirst = b.next(b.batchFirst) - } - } - - b.metricAdded() - - b.buf[b.last] = m - b.last = b.next(b.last) - - if b.size == b.cap { - b.first = b.next(b.first) - } - - b.size = min(b.size+1, b.cap) - return dropped -} - func (b *MemoryBuffer) Add(metrics ...telegraf.Metric) int { b.Lock() defer b.Unlock() @@ -149,10 +119,44 @@ func (b *MemoryBuffer) Reject(batch []telegraf.Metric) { b.BufferSize.Set(int64(b.length())) } +func (b *MemoryBuffer) Close() error { + return nil +} + func (b *MemoryBuffer) Stats() BufferStats { return b.BufferStats } +func (b *MemoryBuffer) length() int { + return min(b.size+b.batchSize, b.cap) +} + +func (b *MemoryBuffer) addMetric(m telegraf.Metric) int { + dropped := 0 + // Check if Buffer is full + if b.size == b.cap { + b.metricDropped(b.buf[b.last]) + dropped++ + + if b.batchSize > 0 { + b.batchSize-- + b.batchFirst = b.next(b.batchFirst) + } + } + + b.metricAdded() + + b.buf[b.last] = m + b.last = b.next(b.last) + + if b.size == b.cap { + b.first = b.next(b.first) + } + + b.size = min(b.size+1, b.cap) + return dropped +} + // next returns the next index with wrapping. func (b *MemoryBuffer) next(index int) int { index++ diff --git a/models/buffer_mem_test.go b/models/buffer_mem_test.go index 014f626315924..650bd3bf65c93 100644 --- a/models/buffer_mem_test.go +++ b/models/buffer_mem_test.go @@ -2,38 +2,42 @@ package models import ( "testing" + "time" + "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/require" ) -func newTestMemoryBuffer(t testing.TB, capacity int) Buffer { - t.Helper() - buf, err := NewBuffer("test", "123", "", capacity, "memory", "") +func TestMemoryBufferAcceptCallsMetricAccept(t *testing.T) { + buf, err := NewBuffer("test", "123", "", 5, "memory", "") require.NoError(t, err) buf.Stats().MetricsAdded.Set(0) buf.Stats().MetricsWritten.Set(0) buf.Stats().MetricsDropped.Set(0) - return buf -} + defer buf.Close() -func TestBuffer_AcceptCallsMetricAccept(t *testing.T) { var accept int - mm := &MockMetric{ - Metric: Metric(), + mm := &mockMetric{ + Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)), AcceptF: func() { accept++ }, } - b := newTestMemoryBuffer(t, 5) - b.Add(mm, mm, mm) - batch := b.Batch(2) - b.Accept(batch) + buf.Add(mm, mm, mm) + batch := buf.Batch(2) + buf.Accept(batch) require.Equal(t, 2, accept) } -func BenchmarkAddMetrics(b *testing.B) { - buf := newTestMemoryBuffer(b, 10000) - m := Metric() +func BenchmarkMemoryBufferAddMetrics(b *testing.B) { + buf, err := NewBuffer("test", "123", "", 10000, "memory", "") + require.NoError(b, err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) for n := 0; n < b.N; n++ { buf.Add(m) } diff --git a/models/buffer_suite_test.go b/models/buffer_suite_test.go index 39814d657825c..99d008096373a 100644 --- a/models/buffer_suite_test.go +++ b/models/buffer_suite_test.go @@ -12,25 +12,6 @@ import ( "github.com/influxdata/telegraf/testutil" ) -type MockMetric struct { - telegraf.Metric - AcceptF func() - RejectF func() - DropF func() -} - -func (m *MockMetric) Accept() { - m.AcceptF() -} - -func (m *MockMetric) Reject() { - m.RejectF() -} - -func (m *MockMetric) Drop() { - m.DropF() -} - type BufferSuiteTest struct { suite.Suite bufferType string @@ -53,7 +34,7 @@ func (s *BufferSuiteTest) SetupTest() { func (s *BufferSuiteTest) TearDownTest() { if s.bufferPath != "" { - os.RemoveAll(s.bufferPath) + s.NoError(os.RemoveAll(s.bufferPath)) s.bufferPath = "" } } @@ -66,22 +47,6 @@ func TestDiskBufferSuite(t *testing.T) { suite.Run(t, &BufferSuiteTest{bufferType: "disk"}) } -func Metric() telegraf.Metric { - return MetricTime(0) -} - -func MetricTime(sec int64) telegraf.Metric { - m := metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "value": 42.0, - }, - time.Unix(sec, 0), - ) - return m -} - func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer { s.T().Helper() buf, err := NewBuffer("test", "123", "", capacity, s.bufferType, s.bufferPath) @@ -92,687 +57,751 @@ func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer { return buf } -func (s *BufferSuiteTest) TestBuffer_LenEmpty() { - b := s.newTestBuffer(5) +func (s *BufferSuiteTest) TestBufferLenEmpty() { + buf := s.newTestBuffer(5) + defer buf.Close() - s.Equal(0, b.Len()) + s.Equal(0, buf.Len()) } -func (s *BufferSuiteTest) TestBuffer_LenOne() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m) +func (s *BufferSuiteTest) TestBufferLenOne() { + buf := s.newTestBuffer(5) + defer buf.Close() - s.Equal(1, b.Len()) + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m) + s.Equal(1, buf.Len()) } -func (s *BufferSuiteTest) TestBuffer_LenFull() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m, m, m, m) +func (s *BufferSuiteTest) TestBufferLenFull() { + buf := s.newTestBuffer(5) + defer buf.Close() - s.Equal(5, b.Len()) + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + s.Equal(5, buf.Len()) } -func (s *BufferSuiteTest) TestBuffer_LenOverfill() { +func (s *BufferSuiteTest) TestBufferLenOverfill() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m, m, m, m, m) + buf := s.newTestBuffer(5) + defer buf.Close() - s.Equal(5, b.Len()) + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m, m) + s.Equal(5, buf.Len()) } -func (s *BufferSuiteTest) TestBuffer_BatchLenZero() { - b := s.newTestBuffer(5) - batch := b.Batch(0) +func (s *BufferSuiteTest) TestBufferBatchLenZero() { + buf := s.newTestBuffer(5) + defer buf.Close() + batch := buf.Batch(0) s.Empty(batch) } -func (s *BufferSuiteTest) TestBuffer_BatchLenBufferEmpty() { - b := s.newTestBuffer(5) - batch := b.Batch(2) +func (s *BufferSuiteTest) TestBufferBatchLenBufferEmpty() { + buf := s.newTestBuffer(5) + defer buf.Close() + batch := buf.Batch(2) s.Empty(batch) } -func (s *BufferSuiteTest) TestBuffer_BatchLenUnderfill() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m) - batch := b.Batch(2) +func (s *BufferSuiteTest) TestBufferBatchLenUnderfill() { + buf := s.newTestBuffer(5) + defer buf.Close() + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m) + batch := buf.Batch(2) s.Len(batch, 1) } -func (s *BufferSuiteTest) TestBuffer_BatchLenFill() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m, m) - batch := b.Batch(2) +func (s *BufferSuiteTest) TestBufferBatchLenFill() { + buf := s.newTestBuffer(5) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m) + batch := buf.Batch(2) s.Len(batch, 2) } -func (s *BufferSuiteTest) TestBuffer_BatchLenExact() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m) - batch := b.Batch(2) +func (s *BufferSuiteTest) TestBufferBatchLenExact() { + buf := s.newTestBuffer(5) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m) + batch := buf.Batch(2) s.Len(batch, 2) } -func (s *BufferSuiteTest) TestBuffer_BatchLenLargerThanBuffer() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m, m, m, m) - batch := b.Batch(6) +func (s *BufferSuiteTest) TestBufferBatchLenLargerThanBuffer() { + buf := s.newTestBuffer(5) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + batch := buf.Batch(6) s.Len(batch, 5) } -func (s *BufferSuiteTest) TestBuffer_BatchWrap() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m, m, m, m) - batch := b.Batch(2) - b.Accept(batch) - b.Add(m, m) - batch = b.Batch(5) +func (s *BufferSuiteTest) TestBufferBatchWrap() { + buf := s.newTestBuffer(5) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + batch := buf.Batch(2) + buf.Accept(batch) + buf.Add(m, m) + batch = buf.Batch(5) s.Len(batch, 5) } -func (s *BufferSuiteTest) TestBuffer_BatchLatest() { - b := s.newTestBuffer(4) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) +func (s *BufferSuiteTest) TestBufferBatchLatest() { + buf := s.newTestBuffer(4) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + batch := buf.Batch(2) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(1), - MetricTime(2), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_BatchLatestWrap() { +func (s *BufferSuiteTest) TestBufferBatchLatestWrap() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - b := s.newTestBuffer(4) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - batch := b.Batch(2) + buf := s.newTestBuffer(4) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) + batch := buf.Batch(2) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(2), - MetricTime(3), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_MultipleBatch() { - b := s.newTestBuffer(10) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - batch := b.Batch(5) +func (s *BufferSuiteTest) TestBufferMultipleBatch() { + buf := s.newTestBuffer(10) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) + batch := buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), }, batch) - b.Accept(batch) - batch = b.Batch(5) + buf.Accept(batch) + batch = buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(6), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)), }, batch) - b.Accept(batch) + buf.Accept(batch) } -func (s *BufferSuiteTest) TestBuffer_RejectWithRoom() { - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Reject(batch) +func (s *BufferSuiteTest) TestBufferRejectWithRoom() { + buf := s.newTestBuffer(5) + defer buf.Close() - s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + batch := buf.Batch(2) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) + buf.Reject(batch) - batch = b.Batch(5) + s.Equal(int64(0), buf.Stats().MetricsDropped.Get()) + + batch = buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_RejectNothingNewFull() { - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - batch := b.Batch(2) - b.Reject(batch) +func (s *BufferSuiteTest) TestBufferRejectNothingNewFull() { + buf := s.newTestBuffer(5) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) + batch := buf.Batch(2) + buf.Reject(batch) - s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + s.Equal(int64(0), buf.Stats().MetricsDropped.Get()) - batch = b.Batch(5) + batch = buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_RejectNoRoom() { +func (s *BufferSuiteTest) TestBufferRejectNoRoom() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) + buf := s.newTestBuffer(5) + defer buf.Close() - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + batch := buf.Batch(2) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0))) + buf.Reject(batch) - b.Reject(batch) + s.Equal(int64(3), buf.Stats().MetricsDropped.Get()) - s.Equal(int64(3), b.Stats().MetricsDropped.Get()) - - batch = b.Batch(5) + batch = buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(4), - MetricTime(5), - MetricTime(6), - MetricTime(7), - MetricTime(8), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_RejectRoomExact() { - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - batch := b.Batch(2) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) +func (s *BufferSuiteTest) TestBufferRejectRoomExact() { + buf := s.newTestBuffer(5) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + batch := buf.Batch(2) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) - b.Reject(batch) + buf.Reject(batch) - s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + s.Equal(int64(0), buf.Stats().MetricsDropped.Get()) - batch = b.Batch(5) + batch = buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_RejectRoomOverwriteOld() { +func (s *BufferSuiteTest) TestBufferRejectRoomOverwriteOld() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(1) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) + buf := s.newTestBuffer(5) + defer buf.Close() - b.Reject(batch) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + batch := buf.Batch(1) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) - s.Equal(int64(1), b.Stats().MetricsDropped.Get()) + buf.Reject(batch) - batch = b.Batch(5) + s.Equal(int64(1), buf.Stats().MetricsDropped.Get()) + + batch = buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - MetricTime(6), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_RejectPartialRoom() { +func (s *BufferSuiteTest) TestBufferRejectPartialRoom() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) - - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) + buf := s.newTestBuffer(5) + defer buf.Close() - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Reject(batch) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + batch := buf.Batch(2) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0))) + buf.Reject(batch) - s.Equal(int64(2), b.Stats().MetricsDropped.Get()) + s.Equal(int64(2), buf.Stats().MetricsDropped.Get()) - batch = b.Batch(5) + batch = buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(3), - MetricTime(4), - MetricTime(5), - MetricTime(6), - MetricTime(7), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_RejectNewMetricsWrapped() { +func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) + buf := s.newTestBuffer(5) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + batch := buf.Batch(2) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) // buffer: 1, 4, 5; batch: 2, 3 - s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + s.Equal(int64(0), buf.Stats().MetricsDropped.Get()) - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - b.Add(MetricTime(9)) - b.Add(MetricTime(10)) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0))) // buffer: 8, 9, 10, 6, 7; batch: 2, 3 - s.Equal(int64(3), b.Stats().MetricsDropped.Get()) + s.Equal(int64(3), buf.Stats().MetricsDropped.Get()) - b.Add(MetricTime(11)) - b.Add(MetricTime(12)) - b.Add(MetricTime(13)) - b.Add(MetricTime(14)) - b.Add(MetricTime(15)) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0))) // buffer: 13, 14, 15, 11, 12; batch: 2, 3 - s.Equal(int64(8), b.Stats().MetricsDropped.Get()) - b.Reject(batch) + s.Equal(int64(8), buf.Stats().MetricsDropped.Get()) + buf.Reject(batch) - s.Equal(int64(10), b.Stats().MetricsDropped.Get()) + s.Equal(int64(10), buf.Stats().MetricsDropped.Get()) - batch = b.Batch(5) + batch = buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(11), - MetricTime(12), - MetricTime(13), - MetricTime(14), - MetricTime(15), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_RejectWrapped() { +func (s *BufferSuiteTest) TestBufferRejectWrapped() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) + buf := s.newTestBuffer(5) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - batch := b.Batch(3) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0))) + batch := buf.Batch(3) - b.Add(MetricTime(9)) - b.Add(MetricTime(10)) - b.Add(MetricTime(11)) - b.Add(MetricTime(12)) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0))) - b.Reject(batch) + buf.Reject(batch) - batch = b.Batch(5) + batch = buf.Batch(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(8), - MetricTime(9), - MetricTime(10), - MetricTime(11), - MetricTime(12), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_RejectAdjustFirst() { +func (s *BufferSuiteTest) TestBufferRejectAdjustFirst() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - b := s.newTestBuffer(10) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(3) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - b.Reject(batch) - - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - b.Add(MetricTime(9)) - batch = b.Batch(3) - b.Add(MetricTime(10)) - b.Add(MetricTime(11)) - b.Add(MetricTime(12)) - b.Reject(batch) - - b.Add(MetricTime(13)) - b.Add(MetricTime(14)) - b.Add(MetricTime(15)) - batch = b.Batch(3) - b.Add(MetricTime(16)) - b.Add(MetricTime(17)) - b.Add(MetricTime(18)) - b.Reject(batch) - - b.Add(MetricTime(19)) - - batch = b.Batch(10) + buf := s.newTestBuffer(10) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + batch := buf.Batch(3) + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) + buf.Reject(batch) + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0))) + batch = buf.Batch(3) + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0))) + buf.Reject(batch) + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0))) + batch = buf.Batch(3) + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(16, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0))) + buf.Reject(batch) + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0))) + + batch = buf.Batch(10) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ - MetricTime(10), - MetricTime(11), - MetricTime(12), - MetricTime(13), - MetricTime(14), - MetricTime(15), - MetricTime(16), - MetricTime(17), - MetricTime(18), - MetricTime(19), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(16, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0)), + metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0)), }, batch) } -func (s *BufferSuiteTest) TestBuffer_AddDropsOverwrittenMetrics() { +func (s *BufferSuiteTest) TestBufferAddDropsOverwrittenMetrics() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - m := Metric() - b := s.newTestBuffer(5) + buf := s.newTestBuffer(5) + defer buf.Close() - b.Add(m, m, m, m, m) - b.Add(m, m, m, m, m) + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + buf.Add(m, m, m, m, m) - s.Equal(int64(5), b.Stats().MetricsDropped.Get()) - s.Equal(int64(0), b.Stats().MetricsWritten.Get()) + s.Equal(int64(5), buf.Stats().MetricsDropped.Get()) + s.Equal(int64(0), buf.Stats().MetricsWritten.Get()) } -func (s *BufferSuiteTest) TestBuffer_AcceptRemovesBatch() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m, m) - batch := b.Batch(2) - b.Accept(batch) - s.Equal(1, b.Len()) +func (s *BufferSuiteTest) TestBufferAcceptRemovesBatch() { + buf := s.newTestBuffer(5) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m) + batch := buf.Batch(2) + buf.Accept(batch) + s.Equal(1, buf.Len()) } -func (s *BufferSuiteTest) TestBuffer_RejectLeavesBatch() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m, m) - batch := b.Batch(2) - b.Reject(batch) - s.Equal(3, b.Len()) +func (s *BufferSuiteTest) TestBufferRejectLeavesBatch() { + buf := s.newTestBuffer(5) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m) + batch := buf.Batch(2) + buf.Reject(batch) + s.Equal(3, buf.Len()) } -func (s *BufferSuiteTest) TestBuffer_AcceptWritesOverwrittenBatch() { - m := Metric() - b := s.newTestBuffer(5) +func (s *BufferSuiteTest) TestBufferAcceptWritesOverwrittenBatch() { + buf := s.newTestBuffer(5) + defer buf.Close() - b.Add(m, m, m, m, m) - batch := b.Batch(5) - b.Add(m, m, m, m, m) - b.Accept(batch) + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + batch := buf.Batch(5) + buf.Add(m, m, m, m, m) + buf.Accept(batch) - s.Equal(int64(0), b.Stats().MetricsDropped.Get()) - s.Equal(int64(5), b.Stats().MetricsWritten.Get()) + s.Equal(int64(0), buf.Stats().MetricsDropped.Get()) + s.Equal(int64(5), buf.Stats().MetricsWritten.Get()) } -func (s *BufferSuiteTest) TestBuffer_BatchRejectDropsOverwrittenBatch() { +func (s *BufferSuiteTest) TestBufferBatchRejectDropsOverwrittenBatch() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - m := Metric() - b := s.newTestBuffer(5) + buf := s.newTestBuffer(5) + defer buf.Close() - b.Add(m, m, m, m, m) - batch := b.Batch(5) - b.Add(m, m, m, m, m) - b.Reject(batch) + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + batch := buf.Batch(5) + buf.Add(m, m, m, m, m) + buf.Reject(batch) - s.Equal(int64(5), b.Stats().MetricsDropped.Get()) - s.Equal(int64(0), b.Stats().MetricsWritten.Get()) + s.Equal(int64(5), buf.Stats().MetricsDropped.Get()) + s.Equal(int64(0), buf.Stats().MetricsWritten.Get()) } -func (s *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchAccept() { - m := Metric() - b := s.newTestBuffer(5) +func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchAccept() { + buf := s.newTestBuffer(5) + defer buf.Close() - b.Add(m, m, m, m, m) - batch := b.Batch(3) - b.Add(m, m, m) - b.Accept(batch) - s.Equal(int64(0), b.Stats().MetricsDropped.Get(), "dropped") - s.Equal(int64(3), b.Stats().MetricsWritten.Get(), "written") + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + batch := buf.Batch(3) + buf.Add(m, m, m) + buf.Accept(batch) + s.Equal(int64(0), buf.Stats().MetricsDropped.Get(), "dropped") + s.Equal(int64(3), buf.Stats().MetricsWritten.Get(), "written") } -func (s *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchReject() { +func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchReject() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - m := Metric() - b := s.newTestBuffer(5) + buf := s.newTestBuffer(5) + defer buf.Close() - b.Add(m, m, m, m, m) - batch := b.Batch(3) - b.Add(m, m, m) - b.Reject(batch) - s.Equal(int64(3), b.Stats().MetricsDropped.Get()) - s.Equal(int64(0), b.Stats().MetricsWritten.Get()) + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + batch := buf.Batch(3) + buf.Add(m, m, m) + buf.Reject(batch) + s.Equal(int64(3), buf.Stats().MetricsDropped.Get()) + s.Equal(int64(0), buf.Stats().MetricsWritten.Get()) } -func (s *BufferSuiteTest) TestBuffer_MetricsBatchAcceptRemoved() { +func (s *BufferSuiteTest) TestBufferMetricsBatchAcceptRemoved() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - m := Metric() - b := s.newTestBuffer(5) + buf := s.newTestBuffer(5) + defer buf.Close() - b.Add(m, m, m, m, m) - batch := b.Batch(3) - b.Add(m, m, m, m, m) - b.Accept(batch) - s.Equal(int64(2), b.Stats().MetricsDropped.Get()) - s.Equal(int64(3), b.Stats().MetricsWritten.Get()) + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + batch := buf.Batch(3) + buf.Add(m, m, m, m, m) + buf.Accept(batch) + s.Equal(int64(2), buf.Stats().MetricsDropped.Get()) + s.Equal(int64(3), buf.Stats().MetricsWritten.Get()) } -func (s *BufferSuiteTest) TestBuffer_WrapWithBatch() { +func (s *BufferSuiteTest) TestBufferWrapWithBatch() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } - m := Metric() - b := s.newTestBuffer(5) + buf := s.newTestBuffer(5) + defer buf.Close() - b.Add(m, m, m) - b.Batch(3) - b.Add(m, m, m, m, m, m) + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m) + buf.Batch(3) + buf.Add(m, m, m, m, m, m) - s.Equal(int64(1), b.Stats().MetricsDropped.Get()) + s.Equal(int64(1), buf.Stats().MetricsDropped.Get()) } -func (s *BufferSuiteTest) TestBuffer_BatchNotRemoved() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m, m, m, m) - b.Batch(2) - s.Equal(5, b.Len()) +func (s *BufferSuiteTest) TestBufferBatchNotRemoved() { + buf := s.newTestBuffer(5) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + buf.Batch(2) + s.Equal(5, buf.Len()) } -func (s *BufferSuiteTest) TestBuffer_BatchRejectAcceptNoop() { - m := Metric() - b := s.newTestBuffer(5) - b.Add(m, m, m, m, m) - batch := b.Batch(2) - b.Reject(batch) - b.Accept(batch) - s.Equal(5, b.Len()) +func (s *BufferSuiteTest) TestBufferBatchRejectAcceptNoop() { + buf := s.newTestBuffer(5) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + batch := buf.Batch(2) + buf.Reject(batch) + buf.Accept(batch) + s.Equal(5, buf.Len()) } -func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNoBatch() { +func (s *BufferSuiteTest) TestBufferAddCallsMetricRejectWhenNoBatch() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } + buf := s.newTestBuffer(5) + defer buf.Close() + var reject int - mm := &MockMetric{ - Metric: Metric(), + mm := &mockMetric{ + Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)), RejectF: func() { reject++ }, } - b := s.newTestBuffer(5) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm) + buf.Add(mm, mm, mm, mm, mm) + buf.Add(mm, mm) s.Equal(2, reject) } -func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNotInBatch() { +func (s *BufferSuiteTest) TestBufferAddCallsMetricRejectWhenNotInBatch() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } + buf := s.newTestBuffer(5) + defer buf.Close() + var reject int - mm := &MockMetric{ - Metric: Metric(), + mm := &mockMetric{ + Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)), RejectF: func() { reject++ }, } - b := s.newTestBuffer(5) - b.Add(mm, mm, mm, mm, mm) - batch := b.Batch(2) - b.Add(mm, mm, mm, mm) + buf.Add(mm, mm, mm, mm, mm) + batch := buf.Batch(2) + buf.Add(mm, mm, mm, mm) s.Equal(2, reject) - b.Reject(batch) + buf.Reject(batch) s.Equal(4, reject) } -func (s *BufferSuiteTest) TestBuffer_RejectCallsMetricRejectWithOverwritten() { +func (s *BufferSuiteTest) TestBufferRejectCallsMetricRejectWithOverwritten() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } + buf := s.newTestBuffer(5) + defer buf.Close() + var reject int - mm := &MockMetric{ - Metric: Metric(), + mm := &mockMetric{ + Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)), RejectF: func() { reject++ }, } - b := s.newTestBuffer(5) - b.Add(mm, mm, mm, mm, mm) - batch := b.Batch(5) - b.Add(mm, mm) + buf.Add(mm, mm, mm, mm, mm) + batch := buf.Batch(5) + buf.Add(mm, mm) s.Equal(0, reject) - b.Reject(batch) + buf.Reject(batch) s.Equal(2, reject) } -func (s *BufferSuiteTest) TestBuffer_AddOverwriteAndReject() { +func (s *BufferSuiteTest) TestBufferAddOverwriteAndReject() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } + buf := s.newTestBuffer(5) + defer buf.Close() + var reject int - mm := &MockMetric{ - Metric: Metric(), + mm := &mockMetric{ + Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)), RejectF: func() { reject++ }, } - b := s.newTestBuffer(5) - b.Add(mm, mm, mm, mm, mm) - batch := b.Batch(5) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm, mm, mm, mm) + buf.Add(mm, mm, mm, mm, mm) + batch := buf.Batch(5) + buf.Add(mm, mm, mm, mm, mm) + buf.Add(mm, mm, mm, mm, mm) + buf.Add(mm, mm, mm, mm, mm) + buf.Add(mm, mm, mm, mm, mm) s.Equal(15, reject) - b.Reject(batch) + buf.Reject(batch) s.Equal(20, reject) } -func (s *BufferSuiteTest) TestBuffer_AddOverwriteAndRejectOffset() { +func (s *BufferSuiteTest) TestBufferAddOverwriteAndRejectOffset() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") } + buf := s.newTestBuffer(5) + defer buf.Close() + var reject int var accept int - mm := &MockMetric{ - Metric: Metric(), + mm := &mockMetric{ + Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)), RejectF: func() { reject++ }, @@ -780,55 +809,85 @@ func (s *BufferSuiteTest) TestBuffer_AddOverwriteAndRejectOffset() { accept++ }, } - b := s.newTestBuffer(5) - b.Add(mm, mm, mm) - b.Add(mm, mm, mm, mm) + buf.Add(mm, mm, mm) + buf.Add(mm, mm, mm, mm) s.Equal(2, reject) - batch := b.Batch(5) - b.Add(mm, mm, mm, mm) + batch := buf.Batch(5) + buf.Add(mm, mm, mm, mm) s.Equal(2, reject) - b.Add(mm, mm, mm, mm) + buf.Add(mm, mm, mm, mm) s.Equal(5, reject) - b.Add(mm, mm, mm, mm) + buf.Add(mm, mm, mm, mm) s.Equal(9, reject) - b.Add(mm, mm, mm, mm) + buf.Add(mm, mm, mm, mm) s.Equal(13, reject) - b.Accept(batch) + buf.Accept(batch) s.Equal(13, reject) s.Equal(5, accept) } -func (s *BufferSuiteTest) TestBuffer_RejectEmptyBatch() { - b := s.newTestBuffer(5) - batch := b.Batch(2) - b.Add(MetricTime(1)) - b.Reject(batch) - b.Add(MetricTime(2)) - batch = b.Batch(2) +func (s *BufferSuiteTest) TestBufferRejectEmptyBatch() { + buf := s.newTestBuffer(5) + defer buf.Close() + + batch := buf.Batch(2) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Reject(batch) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + batch = buf.Batch(2) for _, m := range batch { s.NotNil(m) } } -func (s *BufferSuiteTest) TestBuffer_FlushedPartial() { - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) +func (s *BufferSuiteTest) TestBufferFlushedPartial() { + buf := s.newTestBuffer(5) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) + batch := buf.Batch(2) s.Len(batch, 2) - b.Accept(batch) - s.Equal(1, b.Len()) + buf.Accept(batch) + s.Equal(1, buf.Len()) } -func (s *BufferSuiteTest) TestBuffer_FlushedFull() { - b := s.newTestBuffer(5) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - batch := b.Batch(2) +func (s *BufferSuiteTest) TestBufferFlushedFull() { + buf := s.newTestBuffer(5) + defer buf.Close() + + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) + buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) + batch := buf.Batch(2) s.Len(batch, 2) - b.Accept(batch) - s.Equal(0, b.Len()) + buf.Accept(batch) + s.Equal(0, buf.Len()) +} + +type mockMetric struct { + telegraf.Metric + AcceptF func() + RejectF func() + DropF func() +} + +func (m *mockMetric) Accept() { + if m.AcceptF != nil { + m.AcceptF() + } +} + +func (m *mockMetric) Reject() { + if m.RejectF != nil { + m.RejectF() + } +} + +func (m *mockMetric) Drop() { + if m.DropF != nil { + m.DropF() + } } diff --git a/models/filter_test.go b/models/filter_test.go index b37d7b6dea1f2..c3e68c58cf2ad 100644 --- a/models/filter_test.go +++ b/models/filter_test.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func TestFilter_ApplyEmpty(t *testing.T) { +func TestFilterApplyEmpty(t *testing.T) { f := Filter{} require.NoError(t, f.Compile()) require.False(t, f.IsActive()) @@ -25,7 +25,7 @@ func TestFilter_ApplyEmpty(t *testing.T) { require.True(t, selected) } -func TestFilter_ApplyTagsDontPass(t *testing.T) { +func TestFilterApplyTagsDontPass(t *testing.T) { filters := []TagFilter{ { Name: "cpu", @@ -48,7 +48,7 @@ func TestFilter_ApplyTagsDontPass(t *testing.T) { require.False(t, selected) } -func TestFilter_ApplyDeleteFields(t *testing.T) { +func TestFilterApplyDeleteFields(t *testing.T) { f := Filter{ FieldExclude: []string{"value"}, } @@ -70,7 +70,7 @@ func TestFilter_ApplyDeleteFields(t *testing.T) { require.Equal(t, map[string]interface{}{"value2": int64(2)}, m.Fields()) } -func TestFilter_ApplyDeleteAllFields(t *testing.T) { +func TestFilterApplyDeleteAllFields(t *testing.T) { f := Filter{ FieldExclude: []string{"value*"}, } @@ -92,7 +92,7 @@ func TestFilter_ApplyDeleteAllFields(t *testing.T) { require.Empty(t, m.FieldList()) } -func TestFilter_Empty(t *testing.T) { +func TestFilterEmpty(t *testing.T) { f := Filter{} measurements := []string{ @@ -112,7 +112,7 @@ func TestFilter_Empty(t *testing.T) { } } -func TestFilter_NamePass(t *testing.T) { +func TestFilterNamePass(t *testing.T) { f := Filter{ NamePass: []string{"foo*", "cpu_usage_idle"}, } @@ -146,7 +146,7 @@ func TestFilter_NamePass(t *testing.T) { } } -func TestFilter_NamePass_WithSeparator(t *testing.T) { +func TestFilterNamePass_WithSeparator(t *testing.T) { f := Filter{ NamePass: []string{"foo.*.bar", "foo.*.abc.*.bar"}, NamePassSeparators: ".,", @@ -182,7 +182,7 @@ func TestFilter_NamePass_WithSeparator(t *testing.T) { } } -func TestFilter_NameDrop(t *testing.T) { +func TestFilterNameDrop(t *testing.T) { f := Filter{ NameDrop: []string{"foo*", "cpu_usage_idle"}, } @@ -216,7 +216,7 @@ func TestFilter_NameDrop(t *testing.T) { } } -func TestFilter_NameDrop_WithSeparator(t *testing.T) { +func TestFilterNameDrop_WithSeparator(t *testing.T) { f := Filter{ NameDrop: []string{"foo.*.bar", "foo.*.abc.*.bar"}, NameDropSeparators: ".,", @@ -252,7 +252,7 @@ func TestFilter_NameDrop_WithSeparator(t *testing.T) { } } -func TestFilter_FieldInclude(t *testing.T) { +func TestFilterFieldInclude(t *testing.T) { f := Filter{ FieldInclude: []string{"foo*", "cpu_usage_idle"}, } @@ -282,7 +282,7 @@ func TestFilter_FieldInclude(t *testing.T) { } } -func TestFilter_FieldExclude(t *testing.T) { +func TestFilterFieldExclude(t *testing.T) { f := Filter{ FieldExclude: []string{"foo*", "cpu_usage_idle"}, } @@ -312,7 +312,7 @@ func TestFilter_FieldExclude(t *testing.T) { } } -func TestFilter_TagPass(t *testing.T) { +func TestFilterTagPass(t *testing.T) { filters := []TagFilter{ { Name: "cpu", @@ -356,7 +356,7 @@ func TestFilter_TagPass(t *testing.T) { } } -func TestFilter_TagDrop(t *testing.T) { +func TestFilterTagDrop(t *testing.T) { filters := []TagFilter{ { Name: "cpu", @@ -400,7 +400,7 @@ func TestFilter_TagDrop(t *testing.T) { } } -func TestFilter_FilterTagsNoMatches(t *testing.T) { +func TestFilterTagsNoMatches(t *testing.T) { m := metric.New("m", map[string]string{ "host": "localhost", @@ -428,7 +428,7 @@ func TestFilter_FilterTagsNoMatches(t *testing.T) { require.Equal(t, map[string]string{}, m.Tags()) } -func TestFilter_FilterTagsMatches(t *testing.T) { +func TestFilterTagsMatches(t *testing.T) { m := metric.New("m", map[string]string{ "host": "localhost", @@ -467,7 +467,7 @@ func TestFilter_FilterTagsMatches(t *testing.T) { // TestFilter_FilterNamePassAndDrop used for check case when // both parameters were defined // see: https://github.com/influxdata/telegraf/issues/2860 -func TestFilter_FilterNamePassAndDrop(t *testing.T) { +func TestFilterNamePassAndDrop(t *testing.T) { inputData := []string{"name1", "name2", "name3", "name4"} expectedResult := []bool{false, true, false, false} @@ -486,7 +486,7 @@ func TestFilter_FilterNamePassAndDrop(t *testing.T) { // TestFilter_FieldIncludeAndExclude used for check case when // both parameters were defined // see: https://github.com/influxdata/telegraf/issues/2860 -func TestFilter_FieldIncludeAndExclude(t *testing.T) { +func TestFilterFieldIncludeAndExclude(t *testing.T) { inputData := []string{"field1", "field2", "field3", "field4"} expectedResult := []bool{false, true, false, false} @@ -505,7 +505,7 @@ func TestFilter_FieldIncludeAndExclude(t *testing.T) { // TestFilter_FilterTagsPassAndDrop used for check case when // both parameters were defined // see: https://github.com/influxdata/telegraf/issues/2860 -func TestFilter_FilterTagsPassAndDrop(t *testing.T) { +func TestFilterTagsPassAndDrop(t *testing.T) { inputData := [][]*telegraf.Tag{ {{Key: "tag1", Value: "1"}, {Key: "tag2", Value: "3"}}, {{Key: "tag1", Value: "1"}, {Key: "tag2", Value: "2"}}, @@ -544,7 +544,7 @@ func TestFilter_FilterTagsPassAndDrop(t *testing.T) { } } -func TestFilter_MetricPass(t *testing.T) { +func TestFilterMetricPass(t *testing.T) { m := testutil.MustMetric("cpu", map[string]string{ "host": "Hugin", diff --git a/models/running_aggregator_test.go b/models/running_aggregator_test.go index d21ff54def8dc..76b5095864397 100644 --- a/models/running_aggregator_test.go +++ b/models/running_aggregator_test.go @@ -10,8 +10,8 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func TestAdd(t *testing.T) { - a := &TestAggregator{} +func TestRunningAggregatorAdd(t *testing.T) { + a := &mockAggregator{} ra := NewRunningAggregator(a, &AggregatorConfig{ Name: "TestRunningAggregator", Filter: Filter{ @@ -39,8 +39,8 @@ func TestAdd(t *testing.T) { require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"]) } -func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { - a := &TestAggregator{} +func TestRunningAggregatorAddMetricsOutsideCurrentPeriod(t *testing.T) { + a := &mockAggregator{} ra := NewRunningAggregator(a, &AggregatorConfig{ Name: "TestRunningAggregator", Filter: Filter{ @@ -89,8 +89,8 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"]) } -func TestAddMetricsOutsideCurrentPeriodWithGrace(t *testing.T) { - a := &TestAggregator{} +func TestRunningAggregatorAddMetricsOutsideCurrentPeriodWithGrace(t *testing.T) { + a := &mockAggregator{} ra := NewRunningAggregator(a, &AggregatorConfig{ Name: "TestRunningAggregator", Filter: Filter{ @@ -151,8 +151,8 @@ func TestAddMetricsOutsideCurrentPeriodWithGrace(t *testing.T) { require.Equal(t, int64(203), acc.Metrics[0].Fields["sum"]) } -func TestAddAndPushOnePeriod(t *testing.T) { - a := &TestAggregator{} +func TestRunningAggregatorAddAndPushOnePeriod(t *testing.T) { + a := &mockAggregator{} ra := NewRunningAggregator(a, &AggregatorConfig{ Name: "TestRunningAggregator", Filter: Filter{ @@ -180,8 +180,8 @@ func TestAddAndPushOnePeriod(t *testing.T) { acc.AssertContainsFields(t, "TestMetric", map[string]interface{}{"sum": int64(101)}) } -func TestAddDropOriginal(t *testing.T) { - ra := NewRunningAggregator(&TestAggregator{}, &AggregatorConfig{ +func TestRunningAggregatorAddDropOriginal(t *testing.T) { + ra := NewRunningAggregator(&mockAggregator{}, &AggregatorConfig{ Name: "TestRunningAggregator", Filter: Filter{ NamePass: []string{"RI*"}, @@ -213,8 +213,8 @@ func TestAddDropOriginal(t *testing.T) { require.False(t, ra.Add(m2)) } -func TestAddDoesNotModifyMetric(t *testing.T) { - ra := NewRunningAggregator(&TestAggregator{}, &AggregatorConfig{ +func TestRunningAggregatorAddDoesNotModifyMetric(t *testing.T) { + ra := NewRunningAggregator(&mockAggregator{}, &AggregatorConfig{ Name: "TestRunningAggregator", Filter: Filter{ FieldInclude: []string{"a"}, @@ -239,24 +239,26 @@ func TestAddDoesNotModifyMetric(t *testing.T) { testutil.RequireMetricEqual(t, expected, m) } -type TestAggregator struct { +type mockAggregator struct { sum int64 } -func (t *TestAggregator) Description() string { return "" } -func (t *TestAggregator) SampleConfig() string { return "" } -func (t *TestAggregator) Reset() { +func (t *mockAggregator) SampleConfig() string { + return "" +} + +func (t *mockAggregator) Reset() { t.sum = 0 } -func (t *TestAggregator) Push(acc telegraf.Accumulator) { +func (t *mockAggregator) Push(acc telegraf.Accumulator) { acc.AddFields("TestMetric", map[string]interface{}{"sum": t.sum}, map[string]string{}, ) } -func (t *TestAggregator) Add(in telegraf.Metric) { +func (t *mockAggregator) Add(in telegraf.Metric) { for _, v := range in.Fields() { if vi, ok := v.(int64); ok { t.sum += vi diff --git a/models/running_input_test.go b/models/running_input_test.go index 9bf7d6d9eb855..c3381096a893f 100644 --- a/models/running_input_test.go +++ b/models/running_input_test.go @@ -12,9 +12,9 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func TestMakeMetricFilterAfterApplyingGlobalTags(t *testing.T) { +func TestRunningInputMakeMetricFilterAfterApplyingGlobalTags(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Filter: Filter{ TagInclude: []string{"b"}, }, @@ -43,9 +43,9 @@ func TestMakeMetricFilterAfterApplyingGlobalTags(t *testing.T) { testutil.RequireMetricEqual(t, expected, actual) } -func TestMakeMetricNoFields(t *testing.T) { +func TestRunningInputMakeMetricNoFields(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", }) @@ -59,9 +59,9 @@ func TestMakeMetricNoFields(t *testing.T) { } // nil fields should get dropped -func TestMakeMetricNilFields(t *testing.T) { +func TestRunningInputMakeMetricNilFields(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", }) @@ -86,9 +86,9 @@ func TestMakeMetricNilFields(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricWithPluginTags(t *testing.T) { +func TestRunningInputMakeMetricWithPluginTags(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", Tags: map[string]string{ "foo": "bar", @@ -116,9 +116,9 @@ func TestMakeMetricWithPluginTags(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricFilteredOut(t *testing.T) { +func TestRunningInputMakeMetricFilteredOut(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", Tags: map[string]string{ "foo": "bar", @@ -139,9 +139,9 @@ func TestMakeMetricFilteredOut(t *testing.T) { require.Nil(t, actual) } -func TestMakeMetricWithDaemonTags(t *testing.T) { +func TestRunningInputMakeMetricWithDaemonTags(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", }) ri.SetDefaultTags(map[string]string{ @@ -168,9 +168,9 @@ func TestMakeMetricWithDaemonTags(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricNameOverride(t *testing.T) { +func TestRunningInputMakeMetricNameOverride(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", NameOverride: "foobar", }) @@ -193,9 +193,9 @@ func TestMakeMetricNameOverride(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricNamePrefix(t *testing.T) { +func TestRunningInputMakeMetricNamePrefix(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", MeasurementPrefix: "foobar_", }) @@ -218,9 +218,9 @@ func TestMakeMetricNamePrefix(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricNameSuffix(t *testing.T) { +func TestRunningInputMakeMetricNameSuffix(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", MeasurementSuffix: "_foobar", }) @@ -243,8 +243,8 @@ func TestMakeMetricNameSuffix(t *testing.T) { require.Equal(t, expected, actual) } -func TestMetricErrorCounters(t *testing.T) { - ri := NewRunningInput(&testInput{}, &InputConfig{ +func TestRunningInputMetricErrorCounters(t *testing.T) { + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestMetricErrorCounters", }) @@ -272,9 +272,9 @@ func TestMetricErrorCounters(t *testing.T) { require.GreaterOrEqual(t, int64(1), GlobalGatherErrors.Get()) } -func TestMakeMetricWithAlwaysKeepingPluginTagsDisabled(t *testing.T) { +func TestRunningInputMakeMetricWithAlwaysKeepingPluginTagsDisabled(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", Tags: map[string]string{ "foo": "bar", @@ -309,9 +309,9 @@ func TestMakeMetricWithAlwaysKeepingPluginTagsDisabled(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricWithAlwaysKeepingLocalPluginTagsEnabled(t *testing.T) { +func TestRunningInputMakeMetricWithAlwaysKeepingLocalPluginTagsEnabled(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", Tags: map[string]string{ "foo": "bar", @@ -348,9 +348,9 @@ func TestMakeMetricWithAlwaysKeepingLocalPluginTagsEnabled(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricWithAlwaysKeepingGlobalPluginTagsEnabled(t *testing.T) { +func TestRunningInputMakeMetricWithAlwaysKeepingGlobalPluginTagsEnabled(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", Tags: map[string]string{ "foo": "bar", @@ -387,9 +387,9 @@ func TestMakeMetricWithAlwaysKeepingGlobalPluginTagsEnabled(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricWithAlwaysKeepingPluginTagsEnabled(t *testing.T) { +func TestRunningInputMakeMetricWithAlwaysKeepingPluginTagsEnabled(t *testing.T) { now := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", Tags: map[string]string{ "foo": "bar", @@ -428,8 +428,8 @@ func TestMakeMetricWithAlwaysKeepingPluginTagsEnabled(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricWithGatherMetricTimeSource(t *testing.T) { - ri := NewRunningInput(&testInput{}, &InputConfig{ +func TestRunningInputMakeMetricWithGatherMetricTimeSource(t *testing.T) { + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", Tags: make(map[string]string), Filter: Filter{}, @@ -449,9 +449,9 @@ func TestMakeMetricWithGatherMetricTimeSource(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricWithGatherStartTimeSource(t *testing.T) { +func TestRunningInputMakeMetricWithGatherStartTimeSource(t *testing.T) { start := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", Tags: make(map[string]string), Filter: Filter{}, @@ -470,9 +470,9 @@ func TestMakeMetricWithGatherStartTimeSource(t *testing.T) { require.Equal(t, expected, actual) } -func TestMakeMetricWithGatherEndTimeSource(t *testing.T) { +func TestRunningInputMakeMetricWithGatherEndTimeSource(t *testing.T) { end := time.Now() - ri := NewRunningInput(&testInput{}, &InputConfig{ + ri := NewRunningInput(&mockInput{}, &InputConfig{ Name: "TestRunningInput", TimeSource: "collection_end", }) @@ -487,8 +487,12 @@ func TestMakeMetricWithGatherEndTimeSource(t *testing.T) { require.Equal(t, expected, actual) } -type testInput struct{} +type mockInput struct{} -func (t *testInput) Description() string { return "" } -func (t *testInput) SampleConfig() string { return "" } -func (t *testInput) Gather(_ telegraf.Accumulator) error { return nil } +func (t *mockInput) SampleConfig() string { + return "" +} + +func (t *mockInput) Gather(_ telegraf.Accumulator) error { + return nil +} diff --git a/models/running_output.go b/models/running_output.go index 420af4d764092..d86eafccfd7f2 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -204,6 +204,10 @@ func (r *RunningOutput) Close() { if err := r.Output.Close(); err != nil { r.log.Errorf("Error closing output: %v", err) } + + if err := r.buffer.Close(); err != nil { + r.log.Errorf("Error closing output buffer: %v", err) + } } // AddMetric adds a metric to the output. diff --git a/models/running_output_test.go b/models/running_output_test.go index 626b5fa68968a..6054cf359a5fb 100644 --- a/models/running_output_test.go +++ b/models/running_output_test.go @@ -2,7 +2,6 @@ package models import ( "errors" - "fmt" "sync" "testing" "time" @@ -31,55 +30,8 @@ var next5 = []telegraf.Metric{ testutil.TestMetric(101, "metric10"), } -// Benchmark adding metrics. -func BenchmarkRunningOutputAddWrite(b *testing.B) { - conf := &OutputConfig{ - Filter: Filter{}, - } - - m := &perfOutput{} - ro := NewRunningOutput(m, conf, 1000, 10000) - - for n := 0; n < b.N; n++ { - ro.AddMetric(testutil.TestMetric(101, "metric1")) - ro.Write() //nolint:errcheck // skip checking err for benchmark tests - } -} - -// Benchmark adding metrics. -func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) { - conf := &OutputConfig{ - Filter: Filter{}, - } - - m := &perfOutput{} - ro := NewRunningOutput(m, conf, 1000, 10000) - - for n := 0; n < b.N; n++ { - ro.AddMetric(testutil.TestMetric(101, "metric1")) - if n%100 == 0 { - ro.Write() //nolint:errcheck // skip checking err for benchmark tests - } - } -} - -// Benchmark adding metrics. -func BenchmarkRunningOutputAddFailWrites(b *testing.B) { - conf := &OutputConfig{ - Filter: Filter{}, - } - - m := &perfOutput{} - m.failWrite = true - ro := NewRunningOutput(m, conf, 1000, 10000) - - for n := 0; n < b.N; n++ { - ro.AddMetric(testutil.TestMetric(101, "metric1")) - } -} - // Test that NameDrop filters ger properly applied. -func TestRunningOutput_DropFilter(t *testing.T) { +func TestRunningOutputDropFilter(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ NameDrop: []string{"metric1", "metric2"}, @@ -104,7 +56,7 @@ func TestRunningOutput_DropFilter(t *testing.T) { } // Test that NameDrop filters without a match do nothing. -func TestRunningOutput_PassFilter(t *testing.T) { +func TestRunningOutputPassFilter(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ NameDrop: []string{"metric1000", "foo*"}, @@ -129,7 +81,7 @@ func TestRunningOutput_PassFilter(t *testing.T) { } // Test that tags are properly included -func TestRunningOutput_TagIncludeNoMatch(t *testing.T) { +func TestRunningOutputTagIncludeNoMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ TagInclude: []string{"nothing*"}, @@ -150,7 +102,7 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) { } // Test that tags are properly excluded -func TestRunningOutput_TagExcludeMatch(t *testing.T) { +func TestRunningOutputTagExcludeMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ TagExclude: []string{"tag*"}, @@ -171,7 +123,7 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) { } // Test that tags are properly Excluded -func TestRunningOutput_TagExcludeNoMatch(t *testing.T) { +func TestRunningOutputTagExcludeNoMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ TagExclude: []string{"nothing*"}, @@ -192,7 +144,7 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) { } // Test that tags are properly included -func TestRunningOutput_TagIncludeMatch(t *testing.T) { +func TestRunningOutputTagIncludeMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ TagInclude: []string{"tag*"}, @@ -213,7 +165,7 @@ func TestRunningOutput_TagIncludeMatch(t *testing.T) { } // Test that measurement name overriding correctly -func TestRunningOutput_NameOverride(t *testing.T) { +func TestRunningOutputNameOverride(t *testing.T) { conf := &OutputConfig{ NameOverride: "new_metric_name", } @@ -231,7 +183,7 @@ func TestRunningOutput_NameOverride(t *testing.T) { } // Test that measurement name prefix is added correctly -func TestRunningOutput_NamePrefix(t *testing.T) { +func TestRunningOutputNamePrefix(t *testing.T) { conf := &OutputConfig{ NamePrefix: "prefix_", } @@ -249,7 +201,7 @@ func TestRunningOutput_NamePrefix(t *testing.T) { } // Test that measurement name suffix is added correctly -func TestRunningOutput_NameSuffix(t *testing.T) { +func TestRunningOutputNameSuffix(t *testing.T) { conf := &OutputConfig{ NameSuffix: "_suffix", } @@ -293,8 +245,7 @@ func TestRunningOutputWriteFail(t *testing.T) { Filter: Filter{}, } - m := &mockOutput{} - m.failWrite = true + m := &mockOutput{failWrite: true} ro := NewRunningOutput(m, conf, 4, 12) // Fill buffer to limit twice @@ -326,8 +277,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) { Filter: Filter{}, } - m := &mockOutput{} - m.failWrite = true + m := &mockOutput{failWrite: true} ro := NewRunningOutput(m, conf, 100, 1000) // add 5 metrics @@ -364,8 +314,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) { Filter: Filter{}, } - m := &mockOutput{} - m.failWrite = true + m := &mockOutput{failWrite: true} ro := NewRunningOutput(m, conf, 5, 100) // add 5 metrics @@ -428,8 +377,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) { Filter: Filter{}, } - m := &mockOutput{} - m.failWrite = true + m := &mockOutput{failWrite: true} ro := NewRunningOutput(m, conf, 5, 1000) // add 5 metrics @@ -462,7 +410,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) { require.Equal(t, expected, m.Metrics()) } -func TestInternalMetrics(t *testing.T) { +func TestRunningOutputInternalMetrics(t *testing.T) { _ = NewRunningOutput( &mockOutput{}, &OutputConfig{ @@ -506,7 +454,7 @@ func TestInternalMetrics(t *testing.T) { testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) } -func TestStartupBehaviorInvalid(t *testing.T) { +func TestRunningOutputStartupBehaviorInvalid(t *testing.T) { ro := NewRunningOutput( &mockOutput{}, &OutputConfig{ @@ -520,7 +468,7 @@ func TestStartupBehaviorInvalid(t *testing.T) { require.ErrorContains(t, ro.Init(), "invalid 'startup_error_behavior'") } -func TestRetryableStartupBehaviorDefault(t *testing.T) { +func TestRunningOutputRetryableStartupBehaviorDefault(t *testing.T) { serr := &internal.StartupError{ Err: errors.New("retryable err"), Retry: true, @@ -544,7 +492,7 @@ func TestRetryableStartupBehaviorDefault(t *testing.T) { require.False(t, ro.started) } -func TestRetryableStartupBehaviorError(t *testing.T) { +func TestRunningOutputRetryableStartupBehaviorError(t *testing.T) { serr := &internal.StartupError{ Err: errors.New("retryable err"), Retry: true, @@ -569,7 +517,7 @@ func TestRetryableStartupBehaviorError(t *testing.T) { require.False(t, ro.started) } -func TestRetryableStartupBehaviorRetry(t *testing.T) { +func TestRunningOutputRetryableStartupBehaviorRetry(t *testing.T) { serr := &internal.StartupError{ Err: errors.New("retryable err"), Retry: true, @@ -610,7 +558,7 @@ func TestRetryableStartupBehaviorRetry(t *testing.T) { require.Equal(t, 2, mo.writes) } -func TestRetryableStartupBehaviorIgnore(t *testing.T) { +func TestRunningOutputRetryableStartupBehaviorIgnore(t *testing.T) { serr := &internal.StartupError{ Err: errors.New("retryable err"), Retry: true, @@ -639,7 +587,7 @@ func TestRetryableStartupBehaviorIgnore(t *testing.T) { require.False(t, ro.started) } -func TestNonRetryableStartupBehaviorDefault(t *testing.T) { +func TestRunningOutputNonRetryableStartupBehaviorDefault(t *testing.T) { serr := &internal.StartupError{ Err: errors.New("non-retryable err"), Retry: false, @@ -671,7 +619,7 @@ func TestNonRetryableStartupBehaviorDefault(t *testing.T) { } } -func TestUntypedtartupBehaviorIgnore(t *testing.T) { +func TestRunningOutputUntypedtartupBehaviorIgnore(t *testing.T) { serr := errors.New("untyped err") for _, behavior := range []string{"", "error", "retry", "ignore"} { @@ -700,7 +648,7 @@ func TestUntypedtartupBehaviorIgnore(t *testing.T) { } } -func TestPartiallyStarted(t *testing.T) { +func TestRunningOutputPartiallyStarted(t *testing.T) { serr := &internal.StartupError{ Err: errors.New("partial err"), Retry: true, @@ -743,6 +691,52 @@ func TestPartiallyStarted(t *testing.T) { require.Equal(t, 3, mo.writes) } +// Benchmark adding metrics. +func BenchmarkRunningOutputAddWrite(b *testing.B) { + conf := &OutputConfig{ + Filter: Filter{}, + } + + m := &perfOutput{} + ro := NewRunningOutput(m, conf, 1000, 10000) + + for n := 0; n < b.N; n++ { + ro.AddMetric(testutil.TestMetric(101, "metric1")) + ro.Write() //nolint:errcheck // skip checking err for benchmark tests + } +} + +// Benchmark adding metrics. +func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) { + conf := &OutputConfig{ + Filter: Filter{}, + } + + m := &perfOutput{} + ro := NewRunningOutput(m, conf, 1000, 10000) + + for n := 0; n < b.N; n++ { + ro.AddMetric(testutil.TestMetric(101, "metric1")) + if n%100 == 0 { + ro.Write() //nolint:errcheck // skip checking err for benchmark tests + } + } +} + +// Benchmark adding metrics. +func BenchmarkRunningOutputAddFailWrites(b *testing.B) { + conf := &OutputConfig{ + Filter: Filter{}, + } + + m := &perfOutput{failWrite: true} + ro := NewRunningOutput(m, conf, 1000, 10000) + + for n := 0; n < b.N; n++ { + ro.AddMetric(testutil.TestMetric(101, "metric1")) + } +} + type mockOutput struct { sync.Mutex @@ -770,16 +764,11 @@ func (m *mockOutput) Close() error { return nil } -func (m *mockOutput) Description() string { - return "" -} - func (m *mockOutput) SampleConfig() string { return "" } func (m *mockOutput) Write(metrics []telegraf.Metric) error { - fmt.Println("writing") m.writes++ m.Lock() @@ -807,23 +796,19 @@ type perfOutput struct { failWrite bool } -func (m *perfOutput) Connect() error { +func (*perfOutput) Connect() error { return nil } -func (m *perfOutput) Close() error { +func (*perfOutput) Close() error { return nil } -func (m *perfOutput) Description() string { - return "" -} - -func (m *perfOutput) SampleConfig() string { +func (*perfOutput) SampleConfig() string { return "" } -func (m *perfOutput) Write(_ []telegraf.Metric) error { +func (m *perfOutput) Write([]telegraf.Metric) error { if m.failWrite { return errors.New("failed write") } diff --git a/models/running_processor_test.go b/models/running_processor_test.go index 26a99b16decd4..e6a6932692b68 100644 --- a/models/running_processor_test.go +++ b/models/running_processor_test.go @@ -13,69 +13,16 @@ import ( "github.com/influxdata/telegraf/testutil" ) -// MockProcessor is a Processor with an overridable Apply implementation. -type MockProcessor struct { - ApplyF func(in ...telegraf.Metric) []telegraf.Metric -} - -func (p *MockProcessor) SampleConfig() string { - return "" -} - -func (p *MockProcessor) Description() string { - return "" -} - -func (p *MockProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { - return p.ApplyF(in...) -} - -// MockProcessorToInit is a Processor that needs to be initialized. -type MockProcessorToInit struct { - HasBeenInit bool -} - -func (p *MockProcessorToInit) SampleConfig() string { - return "" -} - -func (p *MockProcessorToInit) Description() string { - return "" -} - -func (p *MockProcessorToInit) Apply(in ...telegraf.Metric) []telegraf.Metric { - return in -} - -func (p *MockProcessorToInit) Init() error { - p.HasBeenInit = true - return nil -} - -func TestRunningProcessor_Init(t *testing.T) { - mock := MockProcessorToInit{} +func TestRunningProcessorInit(t *testing.T) { + mock := mockProcessor{} rp := &models.RunningProcessor{ Processor: processors.NewStreamingProcessorFromProcessor(&mock), } - err := rp.Init() - require.NoError(t, err) - require.True(t, mock.HasBeenInit) -} - -// TagProcessor returns a Processor whose Apply function adds the tag and -// value. -func TagProcessor(key, value string) *MockProcessor { - return &MockProcessor{ - ApplyF: func(in ...telegraf.Metric) []telegraf.Metric { - for _, m := range in { - m.AddTag(key, value) - } - return in - }, - } + require.NoError(t, rp.Init()) + require.True(t, mock.hasBeenInit) } -func TestRunningProcessor_Apply(t *testing.T) { +func TestRunningProcessorApply(t *testing.T) { type args struct { Processor telegraf.StreamingProcessor Config *models.ProcessorConfig @@ -90,7 +37,16 @@ func TestRunningProcessor_Apply(t *testing.T) { { name: "inactive filter applies metrics", args: args{ - Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), + Processor: processors.NewStreamingProcessorFromProcessor( + &mockProcessor{ + applyF: func(in ...telegraf.Metric) []telegraf.Metric { + for _, m := range in { + m.AddTag("apply", "true") + } + return in + }, + }, + ), Config: &models.ProcessorConfig{ Filter: models.Filter{}, }, @@ -121,7 +77,16 @@ func TestRunningProcessor_Apply(t *testing.T) { { name: "filter applies", args: args{ - Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), + Processor: processors.NewStreamingProcessorFromProcessor( + &mockProcessor{ + applyF: func(in ...telegraf.Metric) []telegraf.Metric { + for _, m := range in { + m.AddTag("apply", "true") + } + return in + }, + }, + ), Config: &models.ProcessorConfig{ Filter: models.Filter{ NamePass: []string{"cpu"}, @@ -154,7 +119,16 @@ func TestRunningProcessor_Apply(t *testing.T) { { name: "filter doesn't apply", args: args{ - Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), + Processor: processors.NewStreamingProcessorFromProcessor( + &mockProcessor{ + applyF: func(in ...telegraf.Metric) []telegraf.Metric { + for _, m := range in { + m.AddTag("apply", "true") + } + return in + }, + }, + ), Config: &models.ProcessorConfig{ Filter: models.Filter{ NameDrop: []string{"cpu"}, @@ -208,7 +182,7 @@ func TestRunningProcessor_Apply(t *testing.T) { } } -func TestRunningProcessor_Order(t *testing.T) { +func TestRunningProcessorOrder(t *testing.T) { rp1 := &models.RunningProcessor{ Config: &models.ProcessorConfig{ Order: 1, @@ -231,3 +205,22 @@ func TestRunningProcessor_Order(t *testing.T) { models.RunningProcessors{rp1, rp2, rp3}, procs) } + +// mockProcessor is a processor with an overridable apply implementation. +type mockProcessor struct { + applyF func(in ...telegraf.Metric) []telegraf.Metric + hasBeenInit bool +} + +func (p *mockProcessor) SampleConfig() string { + return "" +} + +func (p *mockProcessor) Init() error { + p.hasBeenInit = true + return nil +} + +func (p *mockProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { + return p.applyF(in...) +}