Skip to content

Commit

Permalink
fix(client): HTTP sender retries send empty request body (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz authored Jun 26, 2024
1 parent d282cfc commit 38fdfc7
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 30 deletions.
26 changes: 11 additions & 15 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"bytes"
"errors"
"fmt"
"io"
"math"
"math/big"
"strconv"
Expand Down Expand Up @@ -122,20 +121,6 @@ func (b *buffer) writeBigInt(i *big.Int) {
b.Write(s)
}

// WriteTo wraps the built-in bytes.Buffer.WriteTo method
// and writes the contents of the buffer to the provided
// io.Writer
func (b *buffer) WriteTo(w io.Writer) (int64, error) {
n, err := b.Buffer.WriteTo(w)
if err != nil {
b.lastMsgPos -= int(n)
return n, err
}
b.lastMsgPos = 0
b.msgCount = 0
return n, nil
}

func (b *buffer) writeTableName(str string) error {
if str == "" {
return fmt.Errorf("table name cannot be empty: %w", errInvalidMsg)
Expand Down Expand Up @@ -386,6 +371,17 @@ func (b *buffer) prepareForField() bool {
return true
}

func (b *buffer) Bytes() []byte {
return b.Buffer.Bytes()
}

func (b *buffer) Reset() {
b.Buffer.Reset()
b.lastMsgPos = 0
b.msgCount = 0
b.resetMsgFlags()
}

func (b *buffer) DiscardPendingMsg() {
b.Truncate(b.lastMsgPos)
b.resetMsgFlags()
Expand Down
6 changes: 5 additions & 1 deletion http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package questdb

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
Expand Down Expand Up @@ -194,17 +195,20 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
if s.buf.msgCount == 0 {
return nil
}
// Always reset the buffer at the end of flush.
defer s.buf.Reset()

// We rely on the following HTTP client implicit behavior:
// s.buf implements WriteTo method which is used by the client.
req, err = http.NewRequest(
http.MethodPost,
s.uri,
&s.buf,
bytes.NewReader(s.buf.Bytes()),
)
if err != nil {
return err
}
req.ContentLength = int64(s.BufLen())

if s.user != "" && s.pass != "" {
req.SetBasicAuth(s.user, s.pass)
Expand Down
21 changes: 21 additions & 0 deletions http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,27 @@ func TestNoFlushWhenSenderIsClosedAndAutoFlushIsDisabled(t *testing.T) {
assert.Empty(t, qdb.Messages(sender))
}

func TestSuccessAfterRetries(t *testing.T) {
ctx := context.Background()

srv, err := newTestHttpServer(failFirstThenSendToBackChannel)
assert.NoError(t, err)
defer srv.Close()

sender, err := qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(srv.Addr()), qdb.WithRetryTimeout(time.Minute))
assert.NoError(t, err)
defer sender.Close(ctx)

err = sender.Table(testTable).Symbol("abc", "def").AtNow(ctx)
assert.NoError(t, err)

err = sender.Flush(ctx)
assert.NoError(t, err)

expectLines(t, srv.BackCh, []string{fmt.Sprintf("%s,abc=def", testTable)})
assert.Zero(t, qdb.BufLen(sender))
}

func TestBufferClearAfterFlush(t *testing.T) {
ctx := context.Background()

Expand Down
2 changes: 1 addition & 1 deletion integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func setupQuestDB0(ctx context.Context, auth ilpAuthType, setupProxy bool) (*que
return nil, err
}
req := testcontainers.ContainerRequest{
Image: "questdb/questdb:7.3.10",
Image: "questdb/questdb:7.4.2",
ExposedPorts: []string{"9000/tcp", "9009/tcp"},
WaitingFor: wait.ForHTTP("/").WithPort("9000"),
Networks: []string{networkName},
Expand Down
3 changes: 3 additions & 0 deletions tcp_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func (s *tcpLineSender) Flush(ctx context.Context) error {
s.conn.SetWriteDeadline(time.Time{})
}

// Always reset the buffer at the end of flush.
defer s.buf.Reset()

if _, err := s.buf.WriteTo(s.conn); err != nil {
return err
}
Expand Down
43 changes: 30 additions & 13 deletions utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"net/http"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -43,11 +44,12 @@ import (
type serverType int64

const (
sendToBackChannel serverType = 0
readAndDiscard serverType = 1
returning500 serverType = 2
returning403 serverType = 3
returning404 serverType = 4
sendToBackChannel serverType = 0
readAndDiscard serverType = 1
returning500 serverType = 2
returning403 serverType = 3
returning404 serverType = 4
failFirstThenSendToBackChannel serverType = 5
)

type testServer struct {
Expand Down Expand Up @@ -186,21 +188,21 @@ func (s *testServer) serveHttp() {
}
}()

var reqs int64
http.Serve(s.tcpListener, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var (
err error
)

switch s.serverType {
case sendToBackChannel:
r := bufio.NewReader(r.Body)
var l string
for err == nil {
l, err = r.ReadString('\n')
if err == nil && len(l) > 0 {
lineFeed <- l[0 : len(l)-1]
}
case failFirstThenSendToBackChannel:
if atomic.AddInt64(&reqs, 1) == 1 {
w.WriteHeader(http.StatusInternalServerError)
} else {
err = readAndSendToBackChannel(r, lineFeed)
}
case sendToBackChannel:
err = readAndSendToBackChannel(r, lineFeed)
case readAndDiscard:
_, err = io.Copy(io.Discard, r.Body)
case returning500:
Expand Down Expand Up @@ -232,6 +234,21 @@ func (s *testServer) serveHttp() {
}))
}

func readAndSendToBackChannel(r *http.Request, lineFeed chan string) error {
read := bufio.NewReader(r.Body)
var (
l string
err error
)
for err == nil {
l, err = read.ReadString('\n')
if err == nil && len(l) > 0 {
lineFeed <- l[0 : len(l)-1]
}
}
return err
}

func (s *testServer) Close() {
close(s.closeCh)
s.tcpListener.Close()
Expand Down

0 comments on commit 38fdfc7

Please sign in to comment.