From e5a0ce998fa2c001ebb64c0481b26c60fe4b25a2 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 6 Nov 2024 18:25:08 +0100 Subject: [PATCH] chore(outputs.influxdb_v2): Cleanup code and tests (#16147) (cherry picked from commit 18b2d3cdc3b5e9710949484ff317165846eb1cb4) --- plugins/outputs/influxdb_v2/http.go | 236 ++++------ .../outputs/influxdb_v2/http_internal_test.go | 173 ------- plugins/outputs/influxdb_v2/http_test.go | 434 ++++++++---------- plugins/outputs/influxdb_v2/influxdb_v2.go | 154 ++++--- .../outputs/influxdb_v2/influxdb_v2_test.go | 352 +++++++++++--- 5 files changed, 654 insertions(+), 695 deletions(-) delete mode 100644 plugins/outputs/influxdb_v2/http_internal_test.go diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index ec99af4ec3234..34e698dd75c94 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -1,6 +1,7 @@ package influxdb_v2 import ( + "bytes" "context" "crypto/tls" "encoding/json" @@ -16,11 +17,12 @@ import ( "strings" "time" + "golang.org/x/net/http2" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/serializers/influx" - "golang.org/x/net/http2" ) type APIError struct { @@ -37,155 +39,102 @@ func (e APIError) Error() string { } const ( - defaultRequestTimeout = time.Second * 5 defaultMaxWaitSeconds = 60 defaultMaxWaitRetryAfterSeconds = 10 * 60 ) -type HTTPConfig struct { - URL *url.URL - LocalAddr *net.TCPAddr - Token config.Secret - Organization string - Bucket string - BucketTag string - ExcludeBucketTag bool - Timeout time.Duration - Headers map[string]string - Proxy *url.URL - UserAgent string - ContentEncoding string - PingTimeout config.Duration - ReadIdleTimeout config.Duration - TLSConfig *tls.Config - - Serializer *influx.Serializer - Log telegraf.Logger -} - type httpClient struct { - ContentEncoding string - Timeout time.Duration - Headers map[string]string - Organization string - Bucket string - BucketTag string - ExcludeBucketTag bool - - client *http.Client - serializer *influx.Serializer - url *url.URL - params url.Values - retryTime time.Time - retryCount int - log telegraf.Logger + url *url.URL + localAddr *net.TCPAddr + token config.Secret + organization string + bucket string + bucketTag string + excludeBucketTag bool + timeout time.Duration + headers map[string]string + proxy *url.URL + userAgent string + contentEncoding string + pingTimeout config.Duration + readIdleTimeout config.Duration + tlsConfig *tls.Config + serializer *influx.Serializer + encoder internal.ContentEncoder + client *http.Client + params url.Values + retryTime time.Time + retryCount int + log telegraf.Logger } -func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) { - if cfg.URL == nil { - return nil, ErrMissingURL - } - - timeout := cfg.Timeout - if timeout == 0 { - timeout = defaultRequestTimeout - } - - userAgent := cfg.UserAgent - if userAgent == "" { - userAgent = internal.ProductToken() +func (c *httpClient) Init() error { + token, err := c.token.Get() + if err != nil { + return fmt.Errorf("getting token failed: %w", err) } - var headers = make(map[string]string, len(cfg.Headers)+2) - headers["User-Agent"] = userAgent - - token, err := cfg.Token.Get() - if err != nil { - return nil, fmt.Errorf("getting token failed: %w", err) + if c.headers == nil { + c.headers = make(map[string]string, 2) } - headers["Authorization"] = "Token " + token.String() + c.headers["Authorization"] = "Token " + token.String() token.Destroy() - for k, v := range cfg.Headers { - headers[k] = v - } + c.headers["User-Agent"] = c.userAgent var proxy func(*http.Request) (*url.URL, error) - if cfg.Proxy != nil { - proxy = http.ProxyURL(cfg.Proxy) + if c.proxy != nil { + proxy = http.ProxyURL(c.proxy) } else { proxy = http.ProxyFromEnvironment } - serializer := cfg.Serializer - if serializer == nil { - serializer = &influx.Serializer{} - if err := serializer.Init(); err != nil { - return nil, err - } - } - var transport *http.Transport - switch cfg.URL.Scheme { + switch c.url.Scheme { case "http", "https": var dialerFunc func(ctx context.Context, network, addr string) (net.Conn, error) - if cfg.LocalAddr != nil { - dialer := &net.Dialer{LocalAddr: cfg.LocalAddr} + if c.localAddr != nil { + dialer := &net.Dialer{LocalAddr: c.localAddr} dialerFunc = dialer.DialContext } transport = &http.Transport{ Proxy: proxy, - TLSClientConfig: cfg.TLSConfig, + TLSClientConfig: c.tlsConfig, DialContext: dialerFunc, } - if cfg.ReadIdleTimeout != 0 || cfg.PingTimeout != 0 { + if c.readIdleTimeout != 0 || c.pingTimeout != 0 { http2Trans, err := http2.ConfigureTransports(transport) if err == nil { - http2Trans.ReadIdleTimeout = time.Duration(cfg.ReadIdleTimeout) - http2Trans.PingTimeout = time.Duration(cfg.PingTimeout) + http2Trans.ReadIdleTimeout = time.Duration(c.readIdleTimeout) + http2Trans.PingTimeout = time.Duration(c.pingTimeout) } } case "unix": transport = &http.Transport{ Dial: func(_, _ string) (net.Conn, error) { return net.DialTimeout( - cfg.URL.Scheme, - cfg.URL.Path, - timeout, + c.url.Scheme, + c.url.Path, + c.timeout, ) }, } default: - return nil, fmt.Errorf("unsupported scheme %q", cfg.URL.Scheme) + return fmt.Errorf("unsupported scheme %q", c.url.Scheme) } - preppedURL, params, err := prepareWriteURL(*cfg.URL, cfg.Organization) + preppedURL, params, err := prepareWriteURL(*c.url, c.organization) if err != nil { - return nil, err + return err } - client := &httpClient{ - serializer: serializer, - client: &http.Client{ - Timeout: timeout, - Transport: transport, - }, - url: preppedURL, - params: params, - ContentEncoding: cfg.ContentEncoding, - Timeout: timeout, - Headers: headers, - Organization: cfg.Organization, - Bucket: cfg.Bucket, - BucketTag: cfg.BucketTag, - ExcludeBucketTag: cfg.ExcludeBucketTag, - log: cfg.Log, + c.url = preppedURL + c.client = &http.Client{ + Timeout: c.timeout, + Transport: transport, } - return client, nil -} + c.params = params -// URL returns the origin URL that this client connects too. -func (c *httpClient) URL() string { - return c.url.String() + return nil } type genericRespError struct { @@ -211,13 +160,13 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } batches := make(map[string][]telegraf.Metric) - if c.BucketTag == "" { - err := c.writeBatch(ctx, c.Bucket, metrics) + if c.bucketTag == "" { + err := c.writeBatch(ctx, c.bucket, metrics) if err != nil { var apiErr *APIError if errors.As(err, &apiErr) { if apiErr.StatusCode == http.StatusRequestEntityTooLarge { - return c.splitAndWriteBatch(ctx, c.Bucket, metrics) + return c.splitAndWriteBatch(ctx, c.bucket, metrics) } } @@ -225,20 +174,20 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } } else { for _, metric := range metrics { - bucket, ok := metric.GetTag(c.BucketTag) + bucket, ok := metric.GetTag(c.bucketTag) if !ok { - bucket = c.Bucket + bucket = c.bucket } if _, ok := batches[bucket]; !ok { batches[bucket] = make([]telegraf.Metric, 0) } - if c.ExcludeBucketTag { + if c.excludeBucketTag { // Avoid modifying the metric in case we need to retry the request. metric = metric.Copy() metric.Accept() - metric.RemoveTag(c.BucketTag) + metric.RemoveTag(c.bucketTag) } batches[bucket] = append(batches[bucket], metric) @@ -250,7 +199,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error var apiErr *APIError if errors.As(err, &apiErr) { if apiErr.StatusCode == http.StatusRequestEntityTooLarge { - return c.splitAndWriteBatch(ctx, c.Bucket, metrics) + return c.splitAndWriteBatch(ctx, c.bucket, metrics) } } @@ -273,14 +222,33 @@ func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metr } func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error { - reader := c.requestBodyReader(metrics) - defer reader.Close() - - req, err := c.makeWriteRequest(makeWriteURL(*c.url, c.params, bucket), reader) + // Serialize the metrics + body, err := c.serializer.SerializeBatch(metrics) if err != nil { return err } + // Encode the content if requested + if c.encoder != nil { + var err error + if body, err = c.encoder.Encode(body); err != nil { + return fmt.Errorf("encoding failed: %w", err) + } + } + + // Setup the request + address := makeWriteURL(*c.url, c.params, bucket) + req, err := http.NewRequest("POST", address, io.NopCloser(bytes.NewBuffer(body))) + if err != nil { + return fmt.Errorf("creating request failed: %w", err) + } + if c.encoder != nil { + req.Header.Set("Content-Encoding", c.contentEncoding) + } + req.Header.Set("Content-Type", "text/plain; charset=utf-8") + c.addHeaders(req) + + // Execute the request resp, err := c.client.Do(req.WithContext(ctx)) if err != nil { internal.OnClientError(c.client, err) @@ -288,6 +256,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te } defer resp.Body.Close() + // Check for success switch resp.StatusCode { case // this is the expected response: @@ -303,6 +272,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te return nil } + // We got an error and now try to decode further writeResp := &genericRespError{} err = json.NewDecoder(resp.Body).Decode(writeResp) desc := writeResp.Error() @@ -388,38 +358,8 @@ func (c *httpClient) getRetryDuration(headers http.Header) time.Duration { return time.Duration(retry*1000) * time.Millisecond } -func (c *httpClient) makeWriteRequest(address string, body io.Reader) (*http.Request, error) { - var err error - - req, err := http.NewRequest("POST", address, body) - if err != nil { - return nil, err - } - - req.Header.Set("Content-Type", "text/plain; charset=utf-8") - c.addHeaders(req) - - if c.ContentEncoding == "gzip" { - req.Header.Set("Content-Encoding", "gzip") - } - - return req, nil -} - -// requestBodyReader warp io.Reader from influx.NewReader to io.ReadCloser, which is useful to fast close the write -// side of the connection in case of error -func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) io.ReadCloser { - reader := influx.NewReader(metrics, c.serializer) - - if c.ContentEncoding == "gzip" { - return internal.CompressWithGzip(reader) - } - - return io.NopCloser(reader) -} - func (c *httpClient) addHeaders(req *http.Request) { - for header, value := range c.Headers { + for header, value := range c.headers { if strings.EqualFold(header, "host") { req.Host = value } else { diff --git a/plugins/outputs/influxdb_v2/http_internal_test.go b/plugins/outputs/influxdb_v2/http_internal_test.go deleted file mode 100644 index f5ab35bae512a..0000000000000 --- a/plugins/outputs/influxdb_v2/http_internal_test.go +++ /dev/null @@ -1,173 +0,0 @@ -package influxdb_v2 - -import ( - "fmt" - "net/http" - "net/url" - "path" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func genURL(u string) *url.URL { - //nolint:errcheck // known test urls - address, _ := url.Parse(u) - return address -} - -func TestMakeWriteURL(t *testing.T) { - tests := []struct { - err bool - url *url.URL - act string - bkt string - org string - }{ - { - url: genURL("http://localhost:9999"), - act: "http://localhost:9999/api/v2/write?bucket=telegraf0&org=influx0", - bkt: "telegraf0", - org: "influx0", - }, - { - url: genURL("http://localhost:9999?id=abc"), - act: "http://localhost:9999/api/v2/write?bucket=telegraf1&id=abc&org=influx1", - bkt: "telegraf1", - org: "influx1", - }, - { - url: genURL("unix://var/run/influxd.sock"), - act: "http://127.0.0.1/api/v2/write?bucket=telegraf2&org=influx2", - bkt: "telegraf2", - org: "influx2", - }, - { - err: true, - url: genURL("udp://localhost:9999"), - }, - } - - for i := range tests { - rURL, params, err := prepareWriteURL(*tests[i].url, tests[i].org) - if !tests[i].err { - require.NoError(t, err) - } else { - require.Error(t, err) - t.Log(err) - } - if err == nil { - for j := 0; j < 2; j++ { - require.Equal(t, tests[i].act, makeWriteURL(*rURL, params, tests[i].bkt)) - } - } - } -} - -func TestExponentialBackoffCalculation(t *testing.T) { - c := &httpClient{} - tests := []struct { - retryCount int - expected time.Duration - }{ - {retryCount: 0, expected: 0}, - {retryCount: 1, expected: 25 * time.Millisecond}, - {retryCount: 5, expected: 625 * time.Millisecond}, - {retryCount: 10, expected: 2500 * time.Millisecond}, - {retryCount: 30, expected: 22500 * time.Millisecond}, - {retryCount: 40, expected: 40 * time.Second}, - {retryCount: 50, expected: 60 * time.Second}, // max hit - {retryCount: 100, expected: 60 * time.Second}, - {retryCount: 1000, expected: 60 * time.Second}, - } - for _, test := range tests { - t.Run(fmt.Sprintf("%d_retries", test.retryCount), func(t *testing.T) { - c.retryCount = test.retryCount - require.EqualValues(t, test.expected, c.getRetryDuration(http.Header{})) - }) - } -} - -func TestExponentialBackoffCalculationWithRetryAfter(t *testing.T) { - c := &httpClient{} - tests := []struct { - retryCount int - retryAfter string - expected time.Duration - }{ - {retryCount: 0, retryAfter: "0", expected: 0}, - {retryCount: 0, retryAfter: "10", expected: 10 * time.Second}, - {retryCount: 0, retryAfter: "60", expected: 60 * time.Second}, - {retryCount: 0, retryAfter: "600", expected: 600 * time.Second}, - {retryCount: 0, retryAfter: "601", expected: 600 * time.Second}, // max hit - {retryCount: 40, retryAfter: "39", expected: 40 * time.Second}, // retryCount wins - {retryCount: 40, retryAfter: "41", expected: 41 * time.Second}, // retryAfter wins - {retryCount: 100, retryAfter: "100", expected: 100 * time.Second}, - } - for _, test := range tests { - t.Run(fmt.Sprintf("%d_retries", test.retryCount), func(t *testing.T) { - c.retryCount = test.retryCount - hdr := http.Header{} - hdr.Add("Retry-After", test.retryAfter) - require.EqualValues(t, test.expected, c.getRetryDuration(hdr)) - }) - } -} - -var ( - bucket = "bkt" - org = "org" - //nolint:errcheck // known test urls - loc, params, _ = prepareWriteURL(*genURL("http://localhost:8086"), org) -) - -// goos: linux -// goarch: amd64 -// pkg: github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 -// cpu: 11th Gen Intel(R) Core(TM) i7-11850H @ 2.50GHz -// BenchmarkOldMakeWriteURL -// BenchmarkOldMakeWriteURL-16 1556631 683.2 ns/op 424 B/op 14 allocs/op -// PASS -// ok github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 1.851s -func BenchmarkOldMakeWriteURL(b *testing.B) { - b.ReportAllocs() - for n := 0; n < b.N; n++ { - //nolint:errcheck // Skip error for benchmarking - oldMakeWriteURL(*loc, org, bucket) - } -} - -// goos: linux -// goarch: amd64 -// pkg: github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 -// cpu: 11th Gen Intel(R) Core(TM) i7-11850H @ 2.50GHz -// BenchmarkNewMakeWriteURL -// BenchmarkNewMakeWriteURL-16 2084415 496.5 ns/op 280 B/op 9 allocs/op -// PASS -// ok github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 1.626s -func BenchmarkNewMakeWriteURL(b *testing.B) { - b.ReportAllocs() - for n := 0; n < b.N; n++ { - makeWriteURL(*loc, params, bucket) - } -} - -func oldMakeWriteURL(loc url.URL, org, bucket string) (string, error) { - params := url.Values{} - params.Set("bucket", bucket) - params.Set("org", org) - - switch loc.Scheme { - case "unix": - loc.Scheme = "http" - loc.Host = "127.0.0.1" - loc.Path = "/api/v2/write" - case "http", "https": - loc.Path = path.Join(loc.Path, "/api/v2/write") - default: - return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) - } - loc.RawQuery = params.Encode() - return loc.String(), nil -} diff --git a/plugins/outputs/influxdb_v2/http_test.go b/plugins/outputs/influxdb_v2/http_test.go index 0f30102d454b9..278e9d45b887d 100644 --- a/plugins/outputs/influxdb_v2/http_test.go +++ b/plugins/outputs/influxdb_v2/http_test.go @@ -1,283 +1,253 @@ -package influxdb_v2_test +package influxdb_v2 import ( - "context" - "io" + "fmt" "net/http" - "net/http/httptest" "net/url" + "path" "testing" "time" "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" - "github.com/influxdata/telegraf/testutil" ) -func genURL(u string) *url.URL { - //nolint:errcheck // known test urls - address, _ := url.Parse(u) - return address -} -func TestNewHTTPClient(t *testing.T) { +func TestHTTPClientInit(t *testing.T) { tests := []struct { - err bool - cfg *influxdb.HTTPConfig + name string + addr string + client *httpClient }{ { - err: true, - cfg: &influxdb.HTTPConfig{}, + name: "unix socket", + addr: "unix://var/run/influxd.sock", + client: &httpClient{}, }, { - err: true, - cfg: &influxdb.HTTPConfig{ - URL: genURL("udp://localhost:9999"), - }, - }, - { - cfg: &influxdb.HTTPConfig{ - URL: genURL("unix://var/run/influxd.sock"), - }, - }, - { - cfg: &influxdb.HTTPConfig{ - URL: genURL("unix://var/run/influxd.sock"), - PingTimeout: config.Duration(15 * time.Second), - ReadIdleTimeout: config.Duration(30 * time.Second), + name: "unix socket with timeouts", + addr: "unix://var/run/influxd.sock", + client: &httpClient{ + pingTimeout: config.Duration(15 * time.Second), + readIdleTimeout: config.Duration(30 * time.Second), }, }, } - for i := range tests { - client, err := influxdb.NewHTTPClient(tests[i].cfg) - if !tests[i].err { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + u, err := url.Parse(tt.addr) require.NoError(t, err) - } else { - require.Error(t, err) - t.Log(err) - } - if err == nil { - client.URL() - } + tt.client.url = u + + require.NoError(t, tt.client.Init()) + }) } } -func TestWrite(t *testing.T) { - ts := httptest.NewServer( - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/api/v2/write": - err := r.ParseForm() - require.NoError(t, err) - require.Equal(t, []string{"foobar"}, r.Form["bucket"]) - - body, err := io.ReadAll(r.Body) - require.NoError(t, err) - require.Contains(t, string(body), "cpu value=42.123") +func TestHTTPClientInitFail(t *testing.T) { + tests := []struct { + name string + addr string + client *httpClient + }{ + { + name: "udp unsupported", + addr: "udp://localhost:9999", + client: &httpClient{}, + }, + } - w.WriteHeader(http.StatusNoContent) - return - default: - w.WriteHeader(http.StatusNotFound) - return - } - }), - ) - defer ts.Close() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + u, err := url.Parse(tt.addr) + require.NoError(t, err) + tt.client.url = u - addr := &url.URL{ - Scheme: "http", - Host: ts.Listener.Addr().String(), + require.Error(t, tt.client.Init()) + }) } +} - cfg := &influxdb.HTTPConfig{ - URL: addr, - Bucket: "telegraf", - BucketTag: "bucket", - ExcludeBucketTag: true, - PingTimeout: config.Duration(15 * time.Second), - ReadIdleTimeout: config.Duration(30 * time.Second), +func TestMakeWriteURL(t *testing.T) { + tests := []struct { + name string + addr string + expected string + bucket string + org string + }{ + { + name: "http default", + addr: "http://localhost:9999", + expected: "http://localhost:9999/api/v2/write?bucket=telegraf0&org=influx0", + bucket: "telegraf0", + org: "influx0", + }, + { + name: "http with param", + addr: "http://localhost:9999?id=abc", + expected: "http://localhost:9999/api/v2/write?bucket=telegraf1&id=abc&org=influx1", + bucket: "telegraf1", + org: "influx1", + }, + { + name: "unix socket default", + addr: "unix://var/run/influxd.sock", + expected: "http://127.0.0.1/api/v2/write?bucket=telegraf2&org=influx2", + bucket: "telegraf2", + org: "influx2", + }, } - client, err := influxdb.NewHTTPClient(cfg) - require.NoError(t, err) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + u, err := url.Parse(tt.addr) + require.NoError(t, err) - metrics := []telegraf.Metric{ - testutil.MustMetric( - "cpu", - map[string]string{ - "bucket": "foobar", - }, - map[string]interface{}{ - "value": 42.123, - }, - time.Unix(0, 0), - ), + preppedURL, params, err := prepareWriteURL(*u, tt.org) + require.NoError(t, err) + require.Equal(t, tt.expected, makeWriteURL(*preppedURL, params, tt.bucket)) + }) } - - ctx := context.Background() - err = client.Write(ctx, metrics) - require.NoError(t, err) - err = client.Write(ctx, metrics) - require.NoError(t, err) } -func TestWriteBucketTagWorksOnRetry(t *testing.T) { - ts := httptest.NewServer( - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/api/v2/write": - err := r.ParseForm() - require.NoError(t, err) - require.Equal(t, []string{"foo"}, r.Form["bucket"]) - - body, err := io.ReadAll(r.Body) - require.NoError(t, err) - require.Contains(t, string(body), "cpu value=42") +func TestMakeWriteURLFail(t *testing.T) { + tests := []struct { + name string + addr string + expected string + bucket string + org string + }{ + { + name: "default values", + addr: "udp://localhost:9999", + }, + } - w.WriteHeader(http.StatusNoContent) - return - default: - w.WriteHeader(http.StatusNotFound) - return - } - }), - ) - defer ts.Close() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + u, err := url.Parse(tt.addr) + require.NoError(t, err) - addr := &url.URL{ - Scheme: "http", - Host: ts.Listener.Addr().String(), + _, _, err = prepareWriteURL(*u, tt.org) + require.Error(t, err) + }) } +} - cfg := &influxdb.HTTPConfig{ - URL: addr, - Bucket: "telegraf", - BucketTag: "bucket", - ExcludeBucketTag: true, +func TestExponentialBackoffCalculation(t *testing.T) { + c := &httpClient{} + tests := []struct { + retryCount int + expected time.Duration + }{ + {retryCount: 0, expected: 0}, + {retryCount: 1, expected: 25 * time.Millisecond}, + {retryCount: 5, expected: 625 * time.Millisecond}, + {retryCount: 10, expected: 2500 * time.Millisecond}, + {retryCount: 30, expected: 22500 * time.Millisecond}, + {retryCount: 40, expected: 40 * time.Second}, + {retryCount: 50, expected: 60 * time.Second}, // max hit + {retryCount: 100, expected: 60 * time.Second}, + {retryCount: 1000, expected: 60 * time.Second}, } - - client, err := influxdb.NewHTTPClient(cfg) - require.NoError(t, err) - - metrics := []telegraf.Metric{ - testutil.MustMetric( - "cpu", - map[string]string{ - "bucket": "foo", - }, - map[string]interface{}{ - "value": 42.0, - }, - time.Unix(0, 0), - ), + for _, test := range tests { + t.Run(fmt.Sprintf("%d_retries", test.retryCount), func(t *testing.T) { + c.retryCount = test.retryCount + require.EqualValues(t, test.expected, c.getRetryDuration(http.Header{})) + }) } - - ctx := context.Background() - err = client.Write(ctx, metrics) - require.NoError(t, err) - err = client.Write(ctx, metrics) - require.NoError(t, err) } -func TestTooLargeWriteRetry(t *testing.T) { - ts := httptest.NewServer( - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/api/v2/write": - err := r.ParseForm() - require.NoError(t, err) - - body, err := io.ReadAll(r.Body) - require.NoError(t, err) - - // Ensure metric body size is small - if len(body) > 16 { - w.WriteHeader(http.StatusRequestEntityTooLarge) - } else { - w.WriteHeader(http.StatusNoContent) - } - - return - default: - w.WriteHeader(http.StatusNotFound) - return - } - }), - ) - defer ts.Close() - - addr := &url.URL{ - Scheme: "http", - Host: ts.Listener.Addr().String(), +func TestExponentialBackoffCalculationWithRetryAfter(t *testing.T) { + c := &httpClient{} + tests := []struct { + retryCount int + retryAfter string + expected time.Duration + }{ + {retryCount: 0, retryAfter: "0", expected: 0}, + {retryCount: 0, retryAfter: "10", expected: 10 * time.Second}, + {retryCount: 0, retryAfter: "60", expected: 60 * time.Second}, + {retryCount: 0, retryAfter: "600", expected: 600 * time.Second}, + {retryCount: 0, retryAfter: "601", expected: 600 * time.Second}, // max hit + {retryCount: 40, retryAfter: "39", expected: 40 * time.Second}, // retryCount wins + {retryCount: 40, retryAfter: "41", expected: 41 * time.Second}, // retryAfter wins + {retryCount: 100, retryAfter: "100", expected: 100 * time.Second}, } - - cfg := &influxdb.HTTPConfig{ - URL: addr, - Bucket: "telegraf", - BucketTag: "bucket", - ExcludeBucketTag: true, - Log: testutil.Logger{}, + for _, test := range tests { + t.Run(fmt.Sprintf("%d_retries", test.retryCount), func(t *testing.T) { + c.retryCount = test.retryCount + hdr := http.Header{} + hdr.Add("Retry-After", test.retryAfter) + require.EqualValues(t, test.expected, c.getRetryDuration(hdr)) + }) } +} - client, err := influxdb.NewHTTPClient(cfg) - require.NoError(t, err) - - // Together the metric batch size is too big, split up, we get success - metrics := []telegraf.Metric{ - testutil.MustMetric( - "cpu", - map[string]string{ - "bucket": "foo", - }, - map[string]interface{}{ - "value": 42.0, - }, - time.Unix(0, 0), - ), - testutil.MustMetric( - "cpu", - map[string]string{ - "bucket": "bar", - }, - map[string]interface{}{ - "value": 99.0, - }, - time.Unix(0, 0), - ), +// goos: linux +// goarch: amd64 +// pkg: github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 +// cpu: 11th Gen Intel(R) Core(TM) i7-11850H @ 2.50GHz +// BenchmarkOldMakeWriteURL +// BenchmarkOldMakeWriteURL-16 1556631 683.2 ns/op 424 B/op 14 allocs/op +// PASS +// ok github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 1.851s +func BenchmarkOldMakeWriteURL(b *testing.B) { + org := "org" + + u, err := url.Parse("http://localhost:8086") + require.NoError(b, err) + loc, _, err := prepareWriteURL(*u, org) + require.NoError(b, err) + + b.ReportAllocs() + for n := 0; n < b.N; n++ { + //nolint:errcheck // Skip error for benchmarking + oldMakeWriteURL(*loc) } +} - ctx := context.Background() - err = client.Write(ctx, metrics) - require.NoError(t, err) - - // These metrics are too big, even after splitting in half, expect error - hugeMetrics := []telegraf.Metric{ - testutil.MustMetric( - "reallyLargeMetric", - map[string]string{ - "bucket": "foobar", - }, - map[string]interface{}{ - "value": 123.456, - }, - time.Unix(0, 0), - ), - testutil.MustMetric( - "evenBiggerMetric", - map[string]string{ - "bucket": "fizzbuzzbang", - }, - map[string]interface{}{ - "value": 999.999, - }, - time.Unix(0, 0), - ), +// goos: linux +// goarch: amd64 +// pkg: github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 +// cpu: 11th Gen Intel(R) Core(TM) i7-11850H @ 2.50GHz +// BenchmarkNewMakeWriteURL +// BenchmarkNewMakeWriteURL-16 2084415 496.5 ns/op 280 B/op 9 allocs/op +// PASS +// ok github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 1.626s +func BenchmarkNewMakeWriteURL(b *testing.B) { + bucket := "bkt" + org := "org" + + u, err := url.Parse("http://localhost:8086") + require.NoError(b, err) + loc, params, err := prepareWriteURL(*u, org) + require.NoError(b, err) + + b.ReportAllocs() + for n := 0; n < b.N; n++ { + makeWriteURL(*loc, params, bucket) } +} - err = client.Write(ctx, hugeMetrics) - require.Error(t, err) +func oldMakeWriteURL(loc url.URL) (string, error) { + params := url.Values{} + params.Set("bucket", "bkt") + params.Set("org", "org") + + switch loc.Scheme { + case "unix": + loc.Scheme = "http" + loc.Host = "127.0.0.1" + loc.Path = "/api/v2/write" + case "http", "https": + loc.Path = path.Join(loc.Path, "/api/v2/write") + default: + return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) + } + loc.RawQuery = params.Encode() + return loc.String(), nil } diff --git a/plugins/outputs/influxdb_v2/influxdb_v2.go b/plugins/outputs/influxdb_v2/influxdb_v2.go index 0f314da0c0b27..15a66632788e2 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2.go @@ -3,6 +3,7 @@ package influxdb_v2 import ( "context" + "crypto/tls" _ "embed" "errors" "fmt" @@ -15,7 +16,8 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/internal" + commontls "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers/influx" ) @@ -23,19 +25,6 @@ import ( //go:embed sample.conf var sampleConfig string -var ( - defaultURL = "http://localhost:8086" - - ErrMissingURL = errors.New("missing URL") -) - -type Client interface { - Write(context.Context, []telegraf.Metric) error - - URL() string // for logging - Close() -} - type InfluxDB struct { URLs []string `toml:"urls"` LocalAddr string `toml:"local_address"` @@ -53,22 +42,63 @@ type InfluxDB struct { OmitTimestamp bool `toml:"influx_omit_timestamp"` PingTimeout config.Duration `toml:"ping_timeout"` ReadIdleTimeout config.Duration `toml:"read_idle_timeout"` - tls.ClientConfig + Log telegraf.Logger `toml:"-"` + commontls.ClientConfig - Log telegraf.Logger `toml:"-"` - - clients []Client + clients []*httpClient + encoder internal.ContentEncoder + serializer *influx.Serializer + tlsCfg *tls.Config } func (*InfluxDB) SampleConfig() string { return sampleConfig } -func (i *InfluxDB) Connect() error { +func (i *InfluxDB) Init() error { + // Set defaults + if i.UserAgent == "" { + i.UserAgent = internal.ProductToken() + } + if len(i.URLs) == 0 { - i.URLs = append(i.URLs, defaultURL) + i.URLs = append(i.URLs, "http://localhost:8086") } + // Check options + switch i.ContentEncoding { + case "", "gzip": + i.ContentEncoding = "gzip" + enc, err := internal.NewGzipEncoder() + if err != nil { + return fmt.Errorf("setting up gzip encoder failed: %w", err) + } + i.encoder = enc + case "identity": + default: + return fmt.Errorf("invalid content encoding %q", i.ContentEncoding) + } + + // Setup the limited serializer + i.serializer = &influx.Serializer{ + UintSupport: i.UintSupport, + OmitTimestamp: i.OmitTimestamp, + } + if err := i.serializer.Init(); err != nil { + return fmt.Errorf("setting up serializer failed: %w", err) + } + + // Setup the client config + tlsCfg, err := i.ClientConfig.TLSConfig() + if err != nil { + return fmt.Errorf("setting up TLS failed: %w", err) + } + i.tlsCfg = tlsCfg + + return nil +} + +func (i *InfluxDB) Connect() error { for _, u := range i.URLs { parts, err := url.Parse(u) if err != nil { @@ -112,9 +142,29 @@ func (i *InfluxDB) Connect() error { switch parts.Scheme { case "http", "https", "unix": - c, err := i.getHTTPClient(parts, localAddr, proxy) - if err != nil { - return err + c := &httpClient{ + url: parts, + localAddr: localAddr, + token: i.Token, + organization: i.Organization, + bucket: i.Bucket, + bucketTag: i.BucketTag, + excludeBucketTag: i.ExcludeBucketTag, + timeout: time.Duration(i.Timeout), + headers: i.HTTPHeaders, + proxy: proxy, + userAgent: i.UserAgent, + contentEncoding: i.ContentEncoding, + tlsConfig: i.tlsCfg, + pingTimeout: i.PingTimeout, + readIdleTimeout: i.ReadIdleTimeout, + serializer: i.serializer, + encoder: i.encoder, + log: i.Log, + } + + if err := c.Init(); err != nil { + return fmt.Errorf("error creating HTTP client [%s]: %w", parts, err) } i.clients = append(i.clients, c) @@ -138,68 +188,22 @@ func (i *InfluxDB) Close() error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error { ctx := context.Background() - var err error - p := rand.Perm(len(i.clients)) - for _, n := range p { + for _, n := range rand.Perm(len(i.clients)) { client := i.clients[n] - err = client.Write(ctx, metrics) - if err == nil { - return nil + if err := client.Write(ctx, metrics); err != nil { + i.Log.Errorf("When writing to [%s]: %v", client.url, err) + continue } - - i.Log.Errorf("When writing to [%s]: %v", client.URL(), err) + return nil } return errors.New("failed to send metrics to any configured server(s)") } -func (i *InfluxDB) getHTTPClient(address *url.URL, localAddr *net.TCPAddr, proxy *url.URL) (Client, error) { - tlsConfig, err := i.ClientConfig.TLSConfig() - if err != nil { - return nil, err - } - - serializer := &influx.Serializer{ - UintSupport: i.UintSupport, - OmitTimestamp: i.OmitTimestamp, - } - if err := serializer.Init(); err != nil { - return nil, err - } - - httpConfig := &HTTPConfig{ - URL: address, - LocalAddr: localAddr, - Token: i.Token, - Organization: i.Organization, - Bucket: i.Bucket, - BucketTag: i.BucketTag, - ExcludeBucketTag: i.ExcludeBucketTag, - Timeout: time.Duration(i.Timeout), - Headers: i.HTTPHeaders, - Proxy: proxy, - UserAgent: i.UserAgent, - ContentEncoding: i.ContentEncoding, - TLSConfig: tlsConfig, - Serializer: serializer, - PingTimeout: i.PingTimeout, - ReadIdleTimeout: i.ReadIdleTimeout, - Log: i.Log, - } - - c, err := NewHTTPClient(httpConfig) - if err != nil { - return nil, fmt.Errorf("error creating HTTP client [%s]: %w", address, err) - } - - return c, nil -} - func init() { outputs.Add("influxdb_v2", func() telegraf.Output { return &InfluxDB{ - Timeout: config.Duration(time.Second * 5), - ContentEncoding: "gzip", + Timeout: config.Duration(time.Second * 5), } }) } diff --git a/plugins/outputs/influxdb_v2/influxdb_v2_test.go b/plugins/outputs/influxdb_v2/influxdb_v2_test.go index d4aa80796eaad..af9fed082ba69 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2_test.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2_test.go @@ -1,107 +1,126 @@ package influxdb_v2_test import ( + "io" "net" + "net/http" + "net/http/httptest" "testing" + "time" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) +func TestSampleConfig(t *testing.T) { + plugin := influxdb.InfluxDB{} + require.NotEmpty(t, plugin.SampleConfig()) +} + +func TestPluginRegistered(t *testing.T) { + require.Contains(t, outputs.Outputs, "influxdb_v2") +} + +func TestCloseWithoutConnect(t *testing.T) { + plugin := influxdb.InfluxDB{} + require.NoError(t, plugin.Close()) +} + func TestDefaultURL(t *testing.T) { - output := influxdb.InfluxDB{} - err := output.Connect() - require.NoError(t, err) - if len(output.URLs) < 1 { - t.Fatal("Default URL failed to get set") - } - require.Equal(t, "http://localhost:8086", output.URLs[0]) + plugin := influxdb.InfluxDB{} + require.NoError(t, plugin.Init()) + require.Len(t, plugin.URLs, 1) + require.Equal(t, "http://localhost:8086", plugin.URLs[0]) } -func TestConnect(t *testing.T) { - tests := []struct { - err bool - out influxdb.InfluxDB - }{ + +func TestInit(t *testing.T) { + tests := []*influxdb.InfluxDB{ { - out: influxdb.InfluxDB{ - URLs: []string{"http://localhost:1234"}, - HTTPProxy: "http://localhost:8086", - HTTPHeaders: map[string]string{ - "x": "y", - }, + URLs: []string{"https://localhost:8080"}, + ClientConfig: tls.ClientConfig{ + TLSCA: "thing", }, }, + } + + for _, plugin := range tests { + t.Run(plugin.URLs[0], func(t *testing.T) { + require.Error(t, plugin.Init()) + }) + } +} + +func TestConnectFail(t *testing.T) { + tests := []*influxdb.InfluxDB{ { - err: true, - out: influxdb.InfluxDB{ - URLs: []string{"!@#$qwert"}, - HTTPProxy: "http://localhost:8086", - HTTPHeaders: map[string]string{ - "x": "y", - }, + URLs: []string{"!@#$qwert"}, + HTTPProxy: "http://localhost:8086", + HTTPHeaders: map[string]string{ + "x": "y", }, }, + { - err: true, - out: influxdb.InfluxDB{ - URLs: []string{"http://localhost:1234"}, - HTTPProxy: "!@#$%^&*()_+", - HTTPHeaders: map[string]string{ - "x": "y", - }, + + URLs: []string{"http://localhost:1234"}, + HTTPProxy: "!@#$%^&*()_+", + HTTPHeaders: map[string]string{ + "x": "y", }, }, + { - err: true, - out: influxdb.InfluxDB{ - URLs: []string{"!@#$%^&*()_+"}, - HTTPProxy: "http://localhost:8086", - HTTPHeaders: map[string]string{ - "x": "y", - }, + + URLs: []string{"!@#$%^&*()_+"}, + HTTPProxy: "http://localhost:8086", + HTTPHeaders: map[string]string{ + "x": "y", }, }, + { - err: true, - out: influxdb.InfluxDB{ - URLs: []string{":::@#$qwert"}, - HTTPProxy: "http://localhost:8086", - HTTPHeaders: map[string]string{ - "x": "y", - }, + + URLs: []string{":::@#$qwert"}, + HTTPProxy: "http://localhost:8086", + HTTPHeaders: map[string]string{ + "x": "y", }, }, + } + + for _, plugin := range tests { + t.Run(plugin.URLs[0], func(t *testing.T) { + require.NoError(t, plugin.Init()) + require.Error(t, plugin.Connect()) + }) + } +} + +func TestConnect(t *testing.T) { + tests := []*influxdb.InfluxDB{ { - err: true, - out: influxdb.InfluxDB{ - URLs: []string{"https://localhost:8080"}, - ClientConfig: tls.ClientConfig{ - TLSCA: "thing", - }, + URLs: []string{"http://localhost:1234"}, + HTTPProxy: "http://localhost:8086", + HTTPHeaders: map[string]string{ + "x": "y", }, }, } - for i := range tests { - err := tests[i].out.Connect() - if !tests[i].err { - require.NoError(t, err) - } else { - require.Error(t, err) - t.Log(err) - } + for _, plugin := range tests { + t.Run(plugin.URLs[0], func(t *testing.T) { + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + }) } } -func TestUnused(_ *testing.T) { - thing := influxdb.InfluxDB{} - thing.Close() - thing.SampleConfig() - outputs.Outputs["influxdb_v2"]() -} - func TestInfluxDBLocalAddress(t *testing.T) { t.Log("Starting server") server, err := net.Listen("tcp", "127.0.0.1:0") @@ -112,3 +131,202 @@ func TestInfluxDBLocalAddress(t *testing.T) { require.NoError(t, output.Connect()) require.NoError(t, output.Close()) } + +func TestWrite(t *testing.T) { + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v2/write": + require.NoError(t, r.ParseForm()) + require.Equal(t, []string{"foobar"}, r.Form["bucket"]) + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + require.Contains(t, string(body), "cpu value=42.123") + + w.WriteHeader(http.StatusNoContent) + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Bucket: "telegraf", + BucketTag: "bucket", + ExcludeBucketTag: true, + ContentEncoding: "identity", + PingTimeout: config.Duration(15 * time.Second), + ReadIdleTimeout: config.Duration(30 * time.Second), + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Test writing + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "bucket": "foobar", + }, + map[string]interface{}{ + "value": 42.123, + }, + time.Unix(0, 0), + ), + } + require.NoError(t, plugin.Write(metrics)) + require.NoError(t, plugin.Write(metrics)) +} + +func TestWriteBucketTagWorksOnRetry(t *testing.T) { + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v2/write": + require.NoError(t, r.ParseForm()) + require.Equal(t, []string{"foo"}, r.Form["bucket"]) + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + require.Contains(t, string(body), "cpu value=42") + + w.WriteHeader(http.StatusNoContent) + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Bucket: "telegraf", + BucketTag: "bucket", + ExcludeBucketTag: true, + ContentEncoding: "identity", + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Send the metrics which should be succeed if sent twice + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + } + require.NoError(t, plugin.Write(metrics)) + require.NoError(t, plugin.Write(metrics)) +} + +func TestTooLargeWriteRetry(t *testing.T) { + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v2/write": + require.NoError(t, r.ParseForm()) + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + + // Ensure metric body size is small + if len(body) > 16 { + w.WriteHeader(http.StatusRequestEntityTooLarge) + } else { + w.WriteHeader(http.StatusNoContent) + } + + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Bucket: "telegraf", + BucketTag: "bucket", + ExcludeBucketTag: true, + ContentEncoding: "identity", + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Together the metric batch size is too big, split up, we get success + metrics := []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "bar", + }, + map[string]interface{}{ + "value": 99.0, + }, + time.Unix(0, 0), + ), + } + require.NoError(t, plugin.Write(metrics)) + + // These metrics are too big, even after splitting in half, expect error + hugeMetrics := []telegraf.Metric{ + metric.New( + "reallyLargeMetric", + map[string]string{ + "bucket": "foobar", + }, + map[string]interface{}{ + "value": 123.456, + }, + time.Unix(0, 0), + ), + metric.New( + "evenBiggerMetric", + map[string]string{ + "bucket": "fizzbuzzbang", + }, + map[string]interface{}{ + "value": 999.999, + }, + time.Unix(0, 0), + ), + } + require.Error(t, plugin.Write(hugeMetrics)) +}