From 47767442426349af118f4c1aa0fb084fb090c455 Mon Sep 17 00:00:00 2001 From: Andrei Pechkurov <37772591+puzpuzpuz@users.noreply.github.com> Date: Thu, 27 Jun 2024 13:16:07 +0300 Subject: [PATCH] fix(client): broken HTTP sender retries (#39) --- http_sender.go | 44 ++++++++++++++++++++------------------------ http_sender_test.go | 19 +++++++++++++++---- sender.go | 13 ++++++++----- utils_test.go | 6 ++++-- 4 files changed, 47 insertions(+), 35 deletions(-) diff --git a/http_sender.go b/http_sender.go index b3b3a20..cefde9e 100644 --- a/http_sender.go +++ b/http_sender.go @@ -171,9 +171,7 @@ func (s *httpLineSender) Flush(ctx context.Context) error { func (s *httpLineSender) flush0(ctx context.Context, closing bool) error { var ( - req *http.Request - retryInterval time.Duration - + retryInterval time.Duration maxRetryInterval = time.Second ) @@ -198,25 +196,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error { // Always reset the buffer at the end of flush. defer s.buf.Reset() - // We rely on the following HTTP client implicit behavior: - // s.buf implements WriteTo method which is used by the client. - req, err = http.NewRequest( - http.MethodPost, - s.uri, - bytes.NewReader(s.buf.Bytes()), - ) - if err != nil { - return err - } - req.ContentLength = int64(s.BufLen()) - - if s.user != "" && s.pass != "" { - req.SetBasicAuth(s.user, s.pass) - } else if s.token != "" { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token)) - } - - retry, err := s.makeRequest(ctx, req) + retry, err := s.makeRequest(ctx) if !retry { s.refreshFlushDeadline(err) return err @@ -234,7 +214,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error { jitter := time.Duration(rand.Intn(10)) * time.Millisecond time.Sleep(retryInterval + jitter) - retry, err = s.makeRequest(ctx, req) + retry, err = s.makeRequest(ctx) if !retry { s.refreshFlushDeadline(err) return err @@ -358,7 +338,23 @@ func (s *httpLineSender) At(ctx context.Context, ts time.Time) error { } // makeRequest returns a boolean if we need to retry the request -func (s *httpLineSender) makeRequest(ctx context.Context, req *http.Request) (bool, error) { +func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) { + req, err := http.NewRequest( + http.MethodPost, + s.uri, + bytes.NewReader(s.buf.Bytes()), + ) + if err != nil { + return false, err + } + req.ContentLength = int64(s.BufLen()) + + if s.user != "" && s.pass != "" { + req.SetBasicAuth(s.user, s.pass) + } else if s.token != "" { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token)) + } + // reqTimeout = ( request.len() / min_throughput ) + request_timeout // nb: conversion from int to time.Duration is in milliseconds reqTimeout := time.Duration(s.buf.Len()/s.minThroughputBytesPerSecond)*time.Second + s.requestTimeout diff --git a/http_sender_test.go b/http_sender_test.go index cce4b68..deb6516 100644 --- a/http_sender_test.go +++ b/http_sender_test.go @@ -580,17 +580,28 @@ func TestSuccessAfterRetries(t *testing.T) { assert.NoError(t, err) defer srv.Close() - sender, err := qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(srv.Addr()), qdb.WithRetryTimeout(time.Minute)) + sender, err := qdb.NewLineSender( + ctx, qdb.WithHttp(), + qdb.WithAddress(srv.Addr()), + qdb.WithAutoFlushDisabled(), + qdb.WithRetryTimeout(time.Minute), + ) assert.NoError(t, err) defer sender.Close(ctx) - err = sender.Table(testTable).Symbol("abc", "def").AtNow(ctx) - assert.NoError(t, err) + for i := 0; i < 10; i++ { + err = sender.Table(testTable).Int64Column("foobar", int64(i)).AtNow(ctx) + assert.NoError(t, err) + } err = sender.Flush(ctx) assert.NoError(t, err) - expectLines(t, srv.BackCh, []string{fmt.Sprintf("%s,abc=def", testTable)}) + expected := make([]string, 0) + for i := 0; i < 10; i++ { + expected = append(expected, fmt.Sprintf("%s foobar=%di", testTable, i)) + } + expectLines(t, srv.BackCh, expected) assert.Zero(t, qdb.BufLen(sender)) } diff --git a/sender.go b/sender.go index 2407aed..9fb534f 100644 --- a/sender.go +++ b/sender.go @@ -83,6 +83,9 @@ type LineSender interface { // TimestampColumn adds a timestamp column value to the ILP // message. // + // Should be used only for non-designated timestamp column. + // Designated timestamp column values should be passed to At/AtNow. + // // Column name cannot contain any of the following characters: // '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. @@ -110,8 +113,8 @@ type LineSender interface { // '-', '*' '%%', '~', or a non-printable char. BoolColumn(name string, val bool) LineSender - // At sets the timestamp in Epoch nanoseconds and finalizes - // the ILP message. + // At sets the designated timestamp value and finalizes the ILP + // message. // // If the underlying buffer reaches configured capacity or the // number of buffered messages exceeds the auto-flush trigger, this @@ -120,9 +123,9 @@ type LineSender interface { // If ts.IsZero(), no timestamp is sent to the server. At(ctx context.Context, ts time.Time) error - // AtNow omits the timestamp and finalizes the ILP message. - // The server will insert each message using the system clock - // as the row timestamp. + // AtNow omits designated timestamp value and finalizes the ILP + // message. The server will insert each message using the system + // clock as the row timestamp. // // If the underlying buffer reaches configured capacity or the // number of buffered messages exceeds the auto-flush trigger, this diff --git a/utils_test.go b/utils_test.go index c3267f6..8ecef81 100644 --- a/utils_test.go +++ b/utils_test.go @@ -82,7 +82,7 @@ func newTestServerWithProtocol(serverType serverType, protocol string) (*testSer addr: tcp.Addr().String(), tcpListener: tcp, serverType: serverType, - BackCh: make(chan string, 5), + BackCh: make(chan string, 1000), closeCh: make(chan struct{}), } @@ -197,6 +197,8 @@ func (s *testServer) serveHttp() { switch s.serverType { case failFirstThenSendToBackChannel: if atomic.AddInt64(&reqs, 1) == 1 { + // Consume request body. + _, err = io.Copy(io.Discard, r.Body) w.WriteHeader(http.StatusInternalServerError) } else { err = readAndSendToBackChannel(r, lineFeed) @@ -265,5 +267,5 @@ func expectLines(t *testing.T, linesCh chan string, expected []string) { return false } return reflect.DeepEqual(expected, actual) - }, 3*time.Second, 100*time.Millisecond) + }, 10*time.Second, 100*time.Millisecond) }