Skip to content

Commit

Permalink
chore(outputs.influxdb_v2): Cleanup code and tests (#16147)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Nov 6, 2024
1 parent 4f951e6 commit 18b2d3c
Show file tree
Hide file tree
Showing 5 changed files with 654 additions and 695 deletions.
236 changes: 88 additions & 148 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package influxdb_v2

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
Expand All @@ -16,11 +17,12 @@ import (
"strings"
"time"

"golang.org/x/net/http2"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"golang.org/x/net/http2"
)

type APIError struct {
Expand All @@ -37,155 +39,102 @@ func (e APIError) Error() string {
}

const (
defaultRequestTimeout = time.Second * 5
defaultMaxWaitSeconds = 60
defaultMaxWaitRetryAfterSeconds = 10 * 60
)

type HTTPConfig struct {
URL *url.URL
LocalAddr *net.TCPAddr
Token config.Secret
Organization string
Bucket string
BucketTag string
ExcludeBucketTag bool
Timeout time.Duration
Headers map[string]string
Proxy *url.URL
UserAgent string
ContentEncoding string
PingTimeout config.Duration
ReadIdleTimeout config.Duration
TLSConfig *tls.Config

Serializer *influx.Serializer
Log telegraf.Logger
}

type httpClient struct {
ContentEncoding string
Timeout time.Duration
Headers map[string]string
Organization string
Bucket string
BucketTag string
ExcludeBucketTag bool

client *http.Client
serializer *influx.Serializer
url *url.URL
params url.Values
retryTime time.Time
retryCount int
log telegraf.Logger
url *url.URL
localAddr *net.TCPAddr
token config.Secret
organization string
bucket string
bucketTag string
excludeBucketTag bool
timeout time.Duration
headers map[string]string
proxy *url.URL
userAgent string
contentEncoding string
pingTimeout config.Duration
readIdleTimeout config.Duration
tlsConfig *tls.Config
serializer *influx.Serializer
encoder internal.ContentEncoder
client *http.Client
params url.Values
retryTime time.Time
retryCount int
log telegraf.Logger
}

func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) {
if cfg.URL == nil {
return nil, ErrMissingURL
}

timeout := cfg.Timeout
if timeout == 0 {
timeout = defaultRequestTimeout
}

userAgent := cfg.UserAgent
if userAgent == "" {
userAgent = internal.ProductToken()
func (c *httpClient) Init() error {
token, err := c.token.Get()
if err != nil {
return fmt.Errorf("getting token failed: %w", err)
}

var headers = make(map[string]string, len(cfg.Headers)+2)
headers["User-Agent"] = userAgent

token, err := cfg.Token.Get()
if err != nil {
return nil, fmt.Errorf("getting token failed: %w", err)
if c.headers == nil {
c.headers = make(map[string]string, 2)
}
headers["Authorization"] = "Token " + token.String()
c.headers["Authorization"] = "Token " + token.String()
token.Destroy()
for k, v := range cfg.Headers {
headers[k] = v
}
c.headers["User-Agent"] = c.userAgent

var proxy func(*http.Request) (*url.URL, error)
if cfg.Proxy != nil {
proxy = http.ProxyURL(cfg.Proxy)
if c.proxy != nil {
proxy = http.ProxyURL(c.proxy)
} else {
proxy = http.ProxyFromEnvironment
}

serializer := cfg.Serializer
if serializer == nil {
serializer = &influx.Serializer{}
if err := serializer.Init(); err != nil {
return nil, err
}
}

var transport *http.Transport
switch cfg.URL.Scheme {
switch c.url.Scheme {
case "http", "https":
var dialerFunc func(ctx context.Context, network, addr string) (net.Conn, error)
if cfg.LocalAddr != nil {
dialer := &net.Dialer{LocalAddr: cfg.LocalAddr}
if c.localAddr != nil {
dialer := &net.Dialer{LocalAddr: c.localAddr}
dialerFunc = dialer.DialContext
}
transport = &http.Transport{
Proxy: proxy,
TLSClientConfig: cfg.TLSConfig,
TLSClientConfig: c.tlsConfig,
DialContext: dialerFunc,
}
if cfg.ReadIdleTimeout != 0 || cfg.PingTimeout != 0 {
if c.readIdleTimeout != 0 || c.pingTimeout != 0 {
http2Trans, err := http2.ConfigureTransports(transport)
if err == nil {
http2Trans.ReadIdleTimeout = time.Duration(cfg.ReadIdleTimeout)
http2Trans.PingTimeout = time.Duration(cfg.PingTimeout)
http2Trans.ReadIdleTimeout = time.Duration(c.readIdleTimeout)
http2Trans.PingTimeout = time.Duration(c.pingTimeout)
}
}
case "unix":
transport = &http.Transport{
Dial: func(_, _ string) (net.Conn, error) {
return net.DialTimeout(
cfg.URL.Scheme,
cfg.URL.Path,
timeout,
c.url.Scheme,
c.url.Path,
c.timeout,
)
},
}
default:
return nil, fmt.Errorf("unsupported scheme %q", cfg.URL.Scheme)
return fmt.Errorf("unsupported scheme %q", c.url.Scheme)
}

preppedURL, params, err := prepareWriteURL(*cfg.URL, cfg.Organization)
preppedURL, params, err := prepareWriteURL(*c.url, c.organization)
if err != nil {
return nil, err
return err
}

client := &httpClient{
serializer: serializer,
client: &http.Client{
Timeout: timeout,
Transport: transport,
},
url: preppedURL,
params: params,
ContentEncoding: cfg.ContentEncoding,
Timeout: timeout,
Headers: headers,
Organization: cfg.Organization,
Bucket: cfg.Bucket,
BucketTag: cfg.BucketTag,
ExcludeBucketTag: cfg.ExcludeBucketTag,
log: cfg.Log,
c.url = preppedURL
c.client = &http.Client{
Timeout: c.timeout,
Transport: transport,
}
return client, nil
}
c.params = params

// URL returns the origin URL that this client connects too.
func (c *httpClient) URL() string {
return c.url.String()
return nil
}

type genericRespError struct {
Expand All @@ -211,34 +160,34 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

batches := make(map[string][]telegraf.Metric)
if c.BucketTag == "" {
err := c.writeBatch(ctx, c.Bucket, metrics)
if c.bucketTag == "" {
err := c.writeBatch(ctx, c.bucket, metrics)
if err != nil {
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
}

return err
}
} else {
for _, metric := range metrics {
bucket, ok := metric.GetTag(c.BucketTag)
bucket, ok := metric.GetTag(c.bucketTag)
if !ok {
bucket = c.Bucket
bucket = c.bucket
}

if _, ok := batches[bucket]; !ok {
batches[bucket] = make([]telegraf.Metric, 0)
}

if c.ExcludeBucketTag {
if c.excludeBucketTag {
// Avoid modifying the metric in case we need to retry the request.
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.BucketTag)
metric.RemoveTag(c.bucketTag)
}

batches[bucket] = append(batches[bucket], metric)
Expand All @@ -250,7 +199,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
}

Expand All @@ -273,21 +222,41 @@ func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metr
}

func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
reader := c.requestBodyReader(metrics)
defer reader.Close()

req, err := c.makeWriteRequest(makeWriteURL(*c.url, c.params, bucket), reader)
// Serialize the metrics
body, err := c.serializer.SerializeBatch(metrics)
if err != nil {
return err
}

// Encode the content if requested
if c.encoder != nil {
var err error
if body, err = c.encoder.Encode(body); err != nil {
return fmt.Errorf("encoding failed: %w", err)
}
}

// Setup the request
address := makeWriteURL(*c.url, c.params, bucket)
req, err := http.NewRequest("POST", address, io.NopCloser(bytes.NewBuffer(body)))
if err != nil {
return fmt.Errorf("creating request failed: %w", err)
}
if c.encoder != nil {
req.Header.Set("Content-Encoding", c.contentEncoding)
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
c.addHeaders(req)

// Execute the request
resp, err := c.client.Do(req.WithContext(ctx))
if err != nil {
internal.OnClientError(c.client, err)
return err
}
defer resp.Body.Close()

// Check for success
switch resp.StatusCode {
case
// this is the expected response:
Expand All @@ -303,6 +272,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
return nil
}

// We got an error and now try to decode further
writeResp := &genericRespError{}
err = json.NewDecoder(resp.Body).Decode(writeResp)
desc := writeResp.Error()
Expand Down Expand Up @@ -388,38 +358,8 @@ func (c *httpClient) getRetryDuration(headers http.Header) time.Duration {
return time.Duration(retry*1000) * time.Millisecond
}

func (c *httpClient) makeWriteRequest(address string, body io.Reader) (*http.Request, error) {
var err error

req, err := http.NewRequest("POST", address, body)
if err != nil {
return nil, err
}

req.Header.Set("Content-Type", "text/plain; charset=utf-8")
c.addHeaders(req)

if c.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}

return req, nil
}

// requestBodyReader warp io.Reader from influx.NewReader to io.ReadCloser, which is useful to fast close the write
// side of the connection in case of error
func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) io.ReadCloser {
reader := influx.NewReader(metrics, c.serializer)

if c.ContentEncoding == "gzip" {
return internal.CompressWithGzip(reader)
}

return io.NopCloser(reader)
}

func (c *httpClient) addHeaders(req *http.Request) {
for header, value := range c.Headers {
for header, value := range c.headers {
if strings.EqualFold(header, "host") {
req.Host = value
} else {
Expand Down
Loading

0 comments on commit 18b2d3c

Please sign in to comment.