Skip to content

Commit

Permalink
fix(client): broken HTTP sender retries (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz authored Jun 27, 2024
1 parent a562658 commit 4776744
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 35 deletions.
44 changes: 20 additions & 24 deletions http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,7 @@ func (s *httpLineSender) Flush(ctx context.Context) error {

func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
var (
req *http.Request
retryInterval time.Duration

retryInterval time.Duration
maxRetryInterval = time.Second
)

Expand All @@ -198,25 +196,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
// Always reset the buffer at the end of flush.
defer s.buf.Reset()

// We rely on the following HTTP client implicit behavior:
// s.buf implements WriteTo method which is used by the client.
req, err = http.NewRequest(
http.MethodPost,
s.uri,
bytes.NewReader(s.buf.Bytes()),
)
if err != nil {
return err
}
req.ContentLength = int64(s.BufLen())

if s.user != "" && s.pass != "" {
req.SetBasicAuth(s.user, s.pass)
} else if s.token != "" {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token))
}

retry, err := s.makeRequest(ctx, req)
retry, err := s.makeRequest(ctx)
if !retry {
s.refreshFlushDeadline(err)
return err
Expand All @@ -234,7 +214,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
jitter := time.Duration(rand.Intn(10)) * time.Millisecond
time.Sleep(retryInterval + jitter)

retry, err = s.makeRequest(ctx, req)
retry, err = s.makeRequest(ctx)
if !retry {
s.refreshFlushDeadline(err)
return err
Expand Down Expand Up @@ -358,7 +338,23 @@ func (s *httpLineSender) At(ctx context.Context, ts time.Time) error {
}

// makeRequest returns a boolean if we need to retry the request
func (s *httpLineSender) makeRequest(ctx context.Context, req *http.Request) (bool, error) {
func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) {
req, err := http.NewRequest(
http.MethodPost,
s.uri,
bytes.NewReader(s.buf.Bytes()),
)
if err != nil {
return false, err
}
req.ContentLength = int64(s.BufLen())

if s.user != "" && s.pass != "" {
req.SetBasicAuth(s.user, s.pass)
} else if s.token != "" {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token))
}

// reqTimeout = ( request.len() / min_throughput ) + request_timeout
// nb: conversion from int to time.Duration is in milliseconds
reqTimeout := time.Duration(s.buf.Len()/s.minThroughputBytesPerSecond)*time.Second + s.requestTimeout
Expand Down
19 changes: 15 additions & 4 deletions http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,17 +580,28 @@ func TestSuccessAfterRetries(t *testing.T) {
assert.NoError(t, err)
defer srv.Close()

sender, err := qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(srv.Addr()), qdb.WithRetryTimeout(time.Minute))
sender, err := qdb.NewLineSender(
ctx, qdb.WithHttp(),
qdb.WithAddress(srv.Addr()),
qdb.WithAutoFlushDisabled(),
qdb.WithRetryTimeout(time.Minute),
)
assert.NoError(t, err)
defer sender.Close(ctx)

err = sender.Table(testTable).Symbol("abc", "def").AtNow(ctx)
assert.NoError(t, err)
for i := 0; i < 10; i++ {
err = sender.Table(testTable).Int64Column("foobar", int64(i)).AtNow(ctx)
assert.NoError(t, err)
}

err = sender.Flush(ctx)
assert.NoError(t, err)

expectLines(t, srv.BackCh, []string{fmt.Sprintf("%s,abc=def", testTable)})
expected := make([]string, 0)
for i := 0; i < 10; i++ {
expected = append(expected, fmt.Sprintf("%s foobar=%di", testTable, i))
}
expectLines(t, srv.BackCh, expected)
assert.Zero(t, qdb.BufLen(sender))
}

Expand Down
13 changes: 8 additions & 5 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type LineSender interface {
// TimestampColumn adds a timestamp column value to the ILP
// message.
//
// Should be used only for non-designated timestamp column.
// Designated timestamp column values should be passed to At/AtNow.
//
// Column name cannot contain any of the following characters:
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
Expand Down Expand Up @@ -110,8 +113,8 @@ type LineSender interface {
// '-', '*' '%%', '~', or a non-printable char.
BoolColumn(name string, val bool) LineSender

// At sets the timestamp in Epoch nanoseconds and finalizes
// the ILP message.
// At sets the designated timestamp value and finalizes the ILP
// message.
//
// If the underlying buffer reaches configured capacity or the
// number of buffered messages exceeds the auto-flush trigger, this
Expand All @@ -120,9 +123,9 @@ type LineSender interface {
// If ts.IsZero(), no timestamp is sent to the server.
At(ctx context.Context, ts time.Time) error

// AtNow omits the timestamp and finalizes the ILP message.
// The server will insert each message using the system clock
// as the row timestamp.
// AtNow omits designated timestamp value and finalizes the ILP
// message. The server will insert each message using the system
// clock as the row timestamp.
//
// If the underlying buffer reaches configured capacity or the
// number of buffered messages exceeds the auto-flush trigger, this
Expand Down
6 changes: 4 additions & 2 deletions utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func newTestServerWithProtocol(serverType serverType, protocol string) (*testSer
addr: tcp.Addr().String(),
tcpListener: tcp,
serverType: serverType,
BackCh: make(chan string, 5),
BackCh: make(chan string, 1000),
closeCh: make(chan struct{}),
}

Expand Down Expand Up @@ -197,6 +197,8 @@ func (s *testServer) serveHttp() {
switch s.serverType {
case failFirstThenSendToBackChannel:
if atomic.AddInt64(&reqs, 1) == 1 {
// Consume request body.
_, err = io.Copy(io.Discard, r.Body)
w.WriteHeader(http.StatusInternalServerError)
} else {
err = readAndSendToBackChannel(r, lineFeed)
Expand Down Expand Up @@ -265,5 +267,5 @@ func expectLines(t *testing.T, linesCh chan string, expected []string) {
return false
}
return reflect.DeepEqual(expected, actual)
}, 3*time.Second, 100*time.Millisecond)
}, 10*time.Second, 100*time.Millisecond)
}

0 comments on commit 4776744

Please sign in to comment.