Skip to content

Commit

Permalink
feat(client): add support for "off" value in auto_flush_rows and auto…
Browse files Browse the repository at this point in the history
…_flush_interval (#37)
  • Loading branch information
jammutkarsh authored Jun 28, 2024
1 parent 4776744 commit 857a531
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 1 deletion.
8 changes: 8 additions & 0 deletions conf_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,20 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
return nil, NewInvalidConfigStrError("invalid %s value, %q is not 'on' or 'off'", k, v)
}
case "auto_flush_rows":
if v == "off" {
senderConf.autoFlushRows = 0
continue
}
parsedVal, err := strconv.Atoi(v)
if err != nil {
return nil, NewInvalidConfigStrError("invalid %s value, %q is not a valid int", k, v)
}
senderConf.autoFlushRows = parsedVal
case "auto_flush_interval":
if v == "off" {
senderConf.autoFlushInterval = 0
continue
}
parsedVal, err := strconv.Atoi(v)
if err != nil {
return nil, NewInvalidConfigStrError("invalid %s value, %q is not a valid int", k, v)
Expand Down
22 changes: 22 additions & 0 deletions conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,28 @@ func TestHappyCasesFromConf(t *testing.T) {
qdb.WithAutoFlushInterval(1000),
},
},
{
name: "auto flush interval off",
config: fmt.Sprintf("http::addr=%s;auto_flush_rows=100;auto_flush_interval=off;",
addr),
expectedOpts: []qdb.LineSenderOption{
qdb.WithHttp(),
qdb.WithAddress(addr),
qdb.WithAutoFlushRows(100),
qdb.WithAutoFlushInterval(0),
},
},
{
name: "auto flush rows off",
config: fmt.Sprintf("http::addr=%s;auto_flush_rows=off;auto_flush_interval=1000;",
addr),
expectedOpts: []qdb.LineSenderOption{
qdb.WithHttp(),
qdb.WithAddress(addr),
qdb.WithAutoFlushRows(0),
qdb.WithAutoFlushInterval(1000),
},
},
}

for _, tc := range testCases {
Expand Down
65 changes: 65 additions & 0 deletions http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,71 @@ func TestTimeBasedAutoFlush(t *testing.T) {
assert.Equal(t, 0, qdb.MsgCount(sender))
}

func TestTimeBasedAutoFlushWithRowBasedFlushDisabled(t *testing.T) {
ctx := context.Background()
autoFlushInterval := 10 * time.Millisecond

srv, err := newTestHttpServer(readAndDiscard)
assert.NoError(t, err)
defer srv.Close()

sender, err := qdb.NewLineSender(
ctx,
qdb.WithHttp(),
qdb.WithAddress(srv.Addr()),
qdb.WithAutoFlushRows(0),
qdb.WithAutoFlushInterval(autoFlushInterval),
)
assert.NoError(t, err)
defer sender.Close(ctx)

// Send a message and ensure it's buffered
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)
assert.Equal(t, 1, qdb.MsgCount(sender))

time.Sleep(2 * autoFlushInterval)

// Send one additional message and ensure that both messages are flushed
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)

assert.Equal(t, 0, qdb.MsgCount(sender))
}

func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {
ctx := context.Background()
autoFlushRows := 1000

srv, err := newTestHttpServer(readAndDiscard)
assert.NoError(t, err)
defer srv.Close()

sender, err := qdb.NewLineSender(
ctx,
qdb.WithHttp(),
qdb.WithAddress(srv.Addr()),
qdb.WithAutoFlushRows(autoFlushRows),
qdb.WithAutoFlushInterval(0),
)
assert.NoError(t, err)
defer sender.Close(ctx)

// Send autoFlushRows - 1 messages and ensure all are buffered
for i := 0; i < autoFlushRows-1; i++ {
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)
}

assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))

// Send one additional message and ensure that all are flushed
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)

assert.Equal(t, 0, qdb.MsgCount(sender))
}

func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
ctx := context.Background()
autoFlushRows := 10
Expand Down
3 changes: 2 additions & 1 deletion sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ func LineSenderFromEnv(ctx context.Context) (LineSender, error) {
// password: for basic authentication
// token: bearer token auth (used instead of basic authentication)
// auto_flush: determines if auto-flushing is enabled (values "on" or "off", defaults to "on")
// auto_flush_rows: auto-flushing is triggered above this row count (defaults to 75000). If set, explicitly implies auto_flush=on
// auto_flush_rows: auto-flushing is triggered above this row count (defaults to 75000). If set, explicitly implies auto_flush=on. Set to 'off' to disable.
//auto_flush_interval auto-flushing is triggered above this time (defaults to 1000 milliseconds). If set, explicitly implies auto_flush=on. Set to 'off' to disable.
// request_min_throughput: bytes per second, used to calculate each request's timeout (defaults to 100KiB/s)
// request_timeout: minimum request timeout in milliseconds (defaults to 10 seconds)
// retry_timeout: cumulative maximum millisecond duration spent in retries (defaults to 10 seconds)
Expand Down

0 comments on commit 857a531

Please sign in to comment.