Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(outputs.influxdb_v2): Cleanup code and tests #16147

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 88 additions & 148 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package influxdb_v2

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
Expand All @@ -16,11 +17,12 @@ import (
"strings"
"time"

"golang.org/x/net/http2"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"golang.org/x/net/http2"
)

type APIError struct {
Expand All @@ -37,155 +39,102 @@ func (e APIError) Error() string {
}

const (
defaultRequestTimeout = time.Second * 5
defaultMaxWaitSeconds = 60
defaultMaxWaitRetryAfterSeconds = 10 * 60
)

type HTTPConfig struct {
URL *url.URL
LocalAddr *net.TCPAddr
Token config.Secret
Organization string
Bucket string
BucketTag string
ExcludeBucketTag bool
Timeout time.Duration
Headers map[string]string
Proxy *url.URL
UserAgent string
ContentEncoding string
PingTimeout config.Duration
ReadIdleTimeout config.Duration
TLSConfig *tls.Config

Serializer *influx.Serializer
Log telegraf.Logger
}

type httpClient struct {
ContentEncoding string
Timeout time.Duration
Headers map[string]string
Organization string
Bucket string
BucketTag string
ExcludeBucketTag bool

client *http.Client
serializer *influx.Serializer
url *url.URL
params url.Values
retryTime time.Time
retryCount int
log telegraf.Logger
url *url.URL
localAddr *net.TCPAddr
token config.Secret
organization string
bucket string
bucketTag string
excludeBucketTag bool
timeout time.Duration
headers map[string]string
proxy *url.URL
userAgent string
contentEncoding string
pingTimeout config.Duration
readIdleTimeout config.Duration
tlsConfig *tls.Config
serializer *influx.Serializer
encoder internal.ContentEncoder
client *http.Client
params url.Values
retryTime time.Time
retryCount int
log telegraf.Logger
}

func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) {
if cfg.URL == nil {
return nil, ErrMissingURL
}

timeout := cfg.Timeout
if timeout == 0 {
timeout = defaultRequestTimeout
}

userAgent := cfg.UserAgent
if userAgent == "" {
userAgent = internal.ProductToken()
func (c *httpClient) Init() error {
token, err := c.token.Get()
if err != nil {
return fmt.Errorf("getting token failed: %w", err)
}

var headers = make(map[string]string, len(cfg.Headers)+2)
headers["User-Agent"] = userAgent

token, err := cfg.Token.Get()
if err != nil {
return nil, fmt.Errorf("getting token failed: %w", err)
if c.headers == nil {
c.headers = make(map[string]string, 2)
}
headers["Authorization"] = "Token " + token.String()
c.headers["Authorization"] = "Token " + token.String()
token.Destroy()
for k, v := range cfg.Headers {
headers[k] = v
}
c.headers["User-Agent"] = c.userAgent

var proxy func(*http.Request) (*url.URL, error)
if cfg.Proxy != nil {
proxy = http.ProxyURL(cfg.Proxy)
if c.proxy != nil {
proxy = http.ProxyURL(c.proxy)
} else {
proxy = http.ProxyFromEnvironment
}

serializer := cfg.Serializer
if serializer == nil {
serializer = &influx.Serializer{}
if err := serializer.Init(); err != nil {
return nil, err
}
}

var transport *http.Transport
switch cfg.URL.Scheme {
switch c.url.Scheme {
case "http", "https":
var dialerFunc func(ctx context.Context, network, addr string) (net.Conn, error)
if cfg.LocalAddr != nil {
dialer := &net.Dialer{LocalAddr: cfg.LocalAddr}
if c.localAddr != nil {
dialer := &net.Dialer{LocalAddr: c.localAddr}
dialerFunc = dialer.DialContext
}
transport = &http.Transport{
Proxy: proxy,
TLSClientConfig: cfg.TLSConfig,
TLSClientConfig: c.tlsConfig,
DialContext: dialerFunc,
}
if cfg.ReadIdleTimeout != 0 || cfg.PingTimeout != 0 {
if c.readIdleTimeout != 0 || c.pingTimeout != 0 {
http2Trans, err := http2.ConfigureTransports(transport)
if err == nil {
http2Trans.ReadIdleTimeout = time.Duration(cfg.ReadIdleTimeout)
http2Trans.PingTimeout = time.Duration(cfg.PingTimeout)
http2Trans.ReadIdleTimeout = time.Duration(c.readIdleTimeout)
http2Trans.PingTimeout = time.Duration(c.pingTimeout)
}
}
case "unix":
transport = &http.Transport{
Dial: func(_, _ string) (net.Conn, error) {
return net.DialTimeout(
cfg.URL.Scheme,
cfg.URL.Path,
timeout,
c.url.Scheme,
c.url.Path,
c.timeout,
)
},
}
default:
return nil, fmt.Errorf("unsupported scheme %q", cfg.URL.Scheme)
return fmt.Errorf("unsupported scheme %q", c.url.Scheme)
}

preppedURL, params, err := prepareWriteURL(*cfg.URL, cfg.Organization)
preppedURL, params, err := prepareWriteURL(*c.url, c.organization)
if err != nil {
return nil, err
return err
}

client := &httpClient{
serializer: serializer,
client: &http.Client{
Timeout: timeout,
Transport: transport,
},
url: preppedURL,
params: params,
ContentEncoding: cfg.ContentEncoding,
Timeout: timeout,
Headers: headers,
Organization: cfg.Organization,
Bucket: cfg.Bucket,
BucketTag: cfg.BucketTag,
ExcludeBucketTag: cfg.ExcludeBucketTag,
log: cfg.Log,
c.url = preppedURL
c.client = &http.Client{
Timeout: c.timeout,
Transport: transport,
}
return client, nil
}
c.params = params

// URL returns the origin URL that this client connects too.
func (c *httpClient) URL() string {
return c.url.String()
return nil
}

type genericRespError struct {
Expand All @@ -211,34 +160,34 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

batches := make(map[string][]telegraf.Metric)
if c.BucketTag == "" {
err := c.writeBatch(ctx, c.Bucket, metrics)
if c.bucketTag == "" {
err := c.writeBatch(ctx, c.bucket, metrics)
if err != nil {
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
}

return err
}
} else {
for _, metric := range metrics {
bucket, ok := metric.GetTag(c.BucketTag)
bucket, ok := metric.GetTag(c.bucketTag)
if !ok {
bucket = c.Bucket
bucket = c.bucket
}

if _, ok := batches[bucket]; !ok {
batches[bucket] = make([]telegraf.Metric, 0)
}

if c.ExcludeBucketTag {
if c.excludeBucketTag {
// Avoid modifying the metric in case we need to retry the request.
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.BucketTag)
metric.RemoveTag(c.bucketTag)
}

batches[bucket] = append(batches[bucket], metric)
Expand All @@ -250,7 +199,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
}

Expand All @@ -273,21 +222,41 @@ func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metr
}

func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
reader := c.requestBodyReader(metrics)
defer reader.Close()

req, err := c.makeWriteRequest(makeWriteURL(*c.url, c.params, bucket), reader)
// Serialize the metrics
body, err := c.serializer.SerializeBatch(metrics)
if err != nil {
return err
}

// Encode the content if requested
if c.encoder != nil {
var err error
if body, err = c.encoder.Encode(body); err != nil {
return fmt.Errorf("encoding failed: %w", err)
}
}

// Setup the request
address := makeWriteURL(*c.url, c.params, bucket)
req, err := http.NewRequest("POST", address, io.NopCloser(bytes.NewBuffer(body)))
if err != nil {
return fmt.Errorf("creating request failed: %w", err)
}
if c.encoder != nil {
req.Header.Set("Content-Encoding", c.contentEncoding)
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
c.addHeaders(req)

// Execute the request
resp, err := c.client.Do(req.WithContext(ctx))
if err != nil {
internal.OnClientError(c.client, err)
return err
}
defer resp.Body.Close()

// Check for success
switch resp.StatusCode {
case
// this is the expected response:
Expand All @@ -303,6 +272,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
return nil
}

// We got an error and now try to decode further
writeResp := &genericRespError{}
err = json.NewDecoder(resp.Body).Decode(writeResp)
desc := writeResp.Error()
Expand Down Expand Up @@ -388,38 +358,8 @@ func (c *httpClient) getRetryDuration(headers http.Header) time.Duration {
return time.Duration(retry*1000) * time.Millisecond
}

func (c *httpClient) makeWriteRequest(address string, body io.Reader) (*http.Request, error) {
var err error

req, err := http.NewRequest("POST", address, body)
if err != nil {
return nil, err
}

req.Header.Set("Content-Type", "text/plain; charset=utf-8")
c.addHeaders(req)

if c.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}

return req, nil
}

// requestBodyReader warp io.Reader from influx.NewReader to io.ReadCloser, which is useful to fast close the write
// side of the connection in case of error
func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) io.ReadCloser {
reader := influx.NewReader(metrics, c.serializer)

if c.ContentEncoding == "gzip" {
return internal.CompressWithGzip(reader)
}

return io.NopCloser(reader)
}

func (c *httpClient) addHeaders(req *http.Request) {
for header, value := range c.Headers {
for header, value := range c.headers {
if strings.EqualFold(header, "host") {
req.Host = value
} else {
Expand Down
Loading
Loading