Skip to content

Commit

Permalink
feat: allow Request.GetBody to be set when gzipping
Browse files Browse the repository at this point in the history
  • Loading branch information
taki-mekhalfa committed Oct 8, 2024
1 parent 1fe10f4 commit 623ca3f
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.13.0 [unreleased]

### Features

1. [#108](https://github.com/InfluxCommunity/influxdb3-go/pull/108): Allow Request.GetBody to be set when writing gzipped data to make calls more resilient.

## 0.12.0 [2024-10-02]

### Features
Expand Down
47 changes: 36 additions & 11 deletions influxdb3/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,22 @@ func (c *Client) WriteWithOptions(ctx context.Context, options *WriteOptions, bu
return c.write(ctx, buff, options)
}

func (c *Client) write(ctx context.Context, buff []byte, options *WriteOptions) error {
// Skip zero size batch
if len(buff) == 0 {
return nil
}
func (c *Client) makeHTTPParams(buff []byte, options *WriteOptions) (*httpParams, error) {
var database string
if options.Database != "" {
database = options.Database
} else {
database = c.config.Database
}
if database == "" {
return errors.New("database not specified")
return nil, errors.New("database not specified")
}

var precision = options.Precision

var gzipThreshold = options.GzipThreshold

var body io.Reader
var err error
u, _ := c.apiURL.Parse("write")
params := u.Query()
params.Set("org", c.config.Organization)
Expand All @@ -163,19 +158,49 @@ func (c *Client) write(ctx context.Context, buff []byte, options *WriteOptions)
body = bytes.NewReader(buff)
headers := http.Header{"Content-Type": {"application/json"}}
if gzipThreshold > 0 && len(buff) >= gzipThreshold {
body, err = gzip.CompressWithGzip(body)
r, err := gzip.CompressWithGzip(body)
if err != nil {
return fmt.Errorf("unable to compress write body: %w", err)
return nil, fmt.Errorf("unable to compress body: %w", err)
}

// This is necessary for Request.GetBody to be set by NewRequest, ensuring that
// the Transport can retry the request when a network error occurs.
// See: https://github.com/golang/go/blob/726d898c92ed0159f283f324478d00f15419f476/src/net/http/request.go#L884
// See: https://github.com/golang/go/blob/726d898c92ed0159f283f324478d00f15419f476/src/net/http/transport.go#L89-L92
//
// It is particularly useful for handling transient errors in HTTP/2 and persistent
// connections in standard HTTP.
// Additionally, it helps manage graceful HTTP/2 shutdowns (e.g. GOAWAY frames).
b, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("unable to read compressed body: %w", err)
}
body = bytes.NewReader(b)

headers["Content-Encoding"] = []string{"gzip"}
}
resp, err := c.makeAPICall(ctx, httpParams{

return &httpParams{
endpointURL: u,
httpMethod: "POST",
headers: headers,
queryParams: u.Query(),
body: body,
})
}, nil
}

func (c *Client) write(ctx context.Context, buff []byte, options *WriteOptions) error {
// Skip zero size batch
if len(buff) == 0 {
return nil
}

params, err := c.makeHTTPParams(buff, options)
if err != nil {
return err
}

resp, err := c.makeAPICall(ctx, *params)
if err != nil {
return err
}
Expand Down
42 changes: 42 additions & 0 deletions influxdb3/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,3 +830,45 @@ func TestWriteWithOptionsNotSet(t *testing.T) {
assert.Error(t, err)
assert.EqualError(t, err, "options not set")
}

func TestMakeHTTPParamsBody(t *testing.T) {
points := genPoints(100)
byts := points2bytes(t, points)

c, err := New(ClientConfig{
Host: "http://localhost",
Token: "my-token",
Database: "my-database",
})
require.NoError(t, err)

for _, gzipThreshold := range []int{
0, // gzipping disabled
1, // gzipping enabled
} {
c.config.WriteOptions.GzipThreshold = gzipThreshold

params, err := c.makeHTTPParams(byts, c.config.WriteOptions)
assert.NoError(t, err)

// copy URL
urlObj := *params.endpointURL
urlObj.RawQuery = params.queryParams.Encode()

fullURL := urlObj.String()

req, err := http.NewRequestWithContext(context.Background(), params.httpMethod, fullURL, params.body)
assert.NoError(t, err)

slurp1, err := io.ReadAll(req.Body)
assert.NoError(t, err)

newBody, err := req.GetBody()
assert.NoError(t, err)

slurp2, err := io.ReadAll(newBody)
assert.NoError(t, err)

assert.Equal(t, string(slurp1), string(slurp2))
}
}

0 comments on commit 623ca3f

Please sign in to comment.