diff --git a/buffer.go b/buffer.go index 41cce8d..c5df6f2 100644 --- a/buffer.go +++ b/buffer.go @@ -28,7 +28,6 @@ import ( "bytes" "errors" "fmt" - "io" "math" "math/big" "strconv" @@ -122,20 +121,6 @@ func (b *buffer) writeBigInt(i *big.Int) { b.Write(s) } -// WriteTo wraps the built-in bytes.Buffer.WriteTo method -// and writes the contents of the buffer to the provided -// io.Writer -func (b *buffer) WriteTo(w io.Writer) (int64, error) { - n, err := b.Buffer.WriteTo(w) - if err != nil { - b.lastMsgPos -= int(n) - return n, err - } - b.lastMsgPos = 0 - b.msgCount = 0 - return n, nil -} - func (b *buffer) writeTableName(str string) error { if str == "" { return fmt.Errorf("table name cannot be empty: %w", errInvalidMsg) @@ -386,6 +371,17 @@ func (b *buffer) prepareForField() bool { return true } +func (b *buffer) Bytes() []byte { + return b.Buffer.Bytes() +} + +func (b *buffer) Reset() { + b.Buffer.Reset() + b.lastMsgPos = 0 + b.msgCount = 0 + b.resetMsgFlags() +} + func (b *buffer) DiscardPendingMsg() { b.Truncate(b.lastMsgPos) b.resetMsgFlags() diff --git a/http_sender.go b/http_sender.go index ef48f64..b3b3a20 100644 --- a/http_sender.go +++ b/http_sender.go @@ -25,6 +25,7 @@ package questdb import ( + "bytes" "context" "crypto/tls" "encoding/json" @@ -194,17 +195,20 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error { if s.buf.msgCount == 0 { return nil } + // Always reset the buffer at the end of flush. + defer s.buf.Reset() // We rely on the following HTTP client implicit behavior: // s.buf implements WriteTo method which is used by the client. req, err = http.NewRequest( http.MethodPost, s.uri, - &s.buf, + bytes.NewReader(s.buf.Bytes()), ) if err != nil { return err } + req.ContentLength = int64(s.BufLen()) if s.user != "" && s.pass != "" { req.SetBasicAuth(s.user, s.pass) diff --git a/http_sender_test.go b/http_sender_test.go index 1936676..cce4b68 100644 --- a/http_sender_test.go +++ b/http_sender_test.go @@ -573,6 +573,27 @@ func TestNoFlushWhenSenderIsClosedAndAutoFlushIsDisabled(t *testing.T) { assert.Empty(t, qdb.Messages(sender)) } +func TestSuccessAfterRetries(t *testing.T) { + ctx := context.Background() + + srv, err := newTestHttpServer(failFirstThenSendToBackChannel) + assert.NoError(t, err) + defer srv.Close() + + sender, err := qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(srv.Addr()), qdb.WithRetryTimeout(time.Minute)) + assert.NoError(t, err) + defer sender.Close(ctx) + + err = sender.Table(testTable).Symbol("abc", "def").AtNow(ctx) + assert.NoError(t, err) + + err = sender.Flush(ctx) + assert.NoError(t, err) + + expectLines(t, srv.BackCh, []string{fmt.Sprintf("%s,abc=def", testTable)}) + assert.Zero(t, qdb.BufLen(sender)) +} + func TestBufferClearAfterFlush(t *testing.T) { ctx := context.Background() diff --git a/integration_test.go b/integration_test.go index 27b6dd8..f6ca7c3 100644 --- a/integration_test.go +++ b/integration_test.go @@ -132,7 +132,7 @@ func setupQuestDB0(ctx context.Context, auth ilpAuthType, setupProxy bool) (*que return nil, err } req := testcontainers.ContainerRequest{ - Image: "questdb/questdb:7.3.10", + Image: "questdb/questdb:7.4.2", ExposedPorts: []string{"9000/tcp", "9009/tcp"}, WaitingFor: wait.ForHTTP("/").WithPort("9000"), Networks: []string{networkName}, diff --git a/tcp_sender.go b/tcp_sender.go index 6d3e07c..8fde732 100644 --- a/tcp_sender.go +++ b/tcp_sender.go @@ -205,6 +205,9 @@ func (s *tcpLineSender) Flush(ctx context.Context) error { s.conn.SetWriteDeadline(time.Time{}) } + // Always reset the buffer at the end of flush. + defer s.buf.Reset() + if _, err := s.buf.WriteTo(s.conn); err != nil { return err } diff --git a/utils_test.go b/utils_test.go index 5c2a1d5..c3267f6 100644 --- a/utils_test.go +++ b/utils_test.go @@ -34,6 +34,7 @@ import ( "net/http" "reflect" "sync" + "sync/atomic" "testing" "time" @@ -43,11 +44,12 @@ import ( type serverType int64 const ( - sendToBackChannel serverType = 0 - readAndDiscard serverType = 1 - returning500 serverType = 2 - returning403 serverType = 3 - returning404 serverType = 4 + sendToBackChannel serverType = 0 + readAndDiscard serverType = 1 + returning500 serverType = 2 + returning403 serverType = 3 + returning404 serverType = 4 + failFirstThenSendToBackChannel serverType = 5 ) type testServer struct { @@ -186,21 +188,21 @@ func (s *testServer) serveHttp() { } }() + var reqs int64 http.Serve(s.tcpListener, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var ( err error ) switch s.serverType { - case sendToBackChannel: - r := bufio.NewReader(r.Body) - var l string - for err == nil { - l, err = r.ReadString('\n') - if err == nil && len(l) > 0 { - lineFeed <- l[0 : len(l)-1] - } + case failFirstThenSendToBackChannel: + if atomic.AddInt64(&reqs, 1) == 1 { + w.WriteHeader(http.StatusInternalServerError) + } else { + err = readAndSendToBackChannel(r, lineFeed) } + case sendToBackChannel: + err = readAndSendToBackChannel(r, lineFeed) case readAndDiscard: _, err = io.Copy(io.Discard, r.Body) case returning500: @@ -232,6 +234,21 @@ func (s *testServer) serveHttp() { })) } +func readAndSendToBackChannel(r *http.Request, lineFeed chan string) error { + read := bufio.NewReader(r.Body) + var ( + l string + err error + ) + for err == nil { + l, err = read.ReadString('\n') + if err == nil && len(l) > 0 { + lineFeed <- l[0 : len(l)-1] + } + } + return err +} + func (s *testServer) Close() { close(s.closeCh) s.tcpListener.Close()