Skip to content

Commit

Permalink
feat(client): add LineSenderFromEnv method (#35)
Browse files Browse the repository at this point in the history
Also improves the docs around HTTP transport

---------

Co-authored-by: Andrey Pechkurov <[email protected]>
  • Loading branch information
jerrinot and puzpuzpuz authored Jun 3, 2024
1 parent 28bf280 commit d282cfc
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 1 deletion.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ func main() {
// Alternatively, you can use the LineSenderFromConf function:
// sender, err := qdb.LineSenderFromConf(ctx, "http::addr=localhost:9000;")
// ...
// or you can export the "http::addr=localhost:9000;" config string to
// the QDB_CLIENT_CONF environment variable and use the LineSenderFromEnv function:
// sender, err := qdb.LineSenderFromEnv(ctx)
// ...
defer sender.Close(context.TODO())
// ...
}
Expand Down
66 changes: 66 additions & 0 deletions http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"errors"
"fmt"
"net/http"
"os"
"testing"
"time"

Expand Down Expand Up @@ -91,6 +92,35 @@ func TestHttpHappyCasesFromConf(t *testing.T) {
}
}

func TestHttpHappyCasesFromEnv(t *testing.T) {
var (
addr = "localhost:1111"
)

testCases := []httpConfigTestCase{
{
name: "addr only",
config: fmt.Sprintf("http::addr=%s", addr),
},
{
name: "auto flush",
config: fmt.Sprintf("http::addr=%s;auto_flush_rows=100;auto_flush_interval=1000;",
addr),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
os.Setenv("QDB_CLIENT_CONF", tc.config)
sender, err := qdb.LineSenderFromEnv(context.Background())
assert.NoError(t, err)

sender.Close(context.Background())
os.Unsetenv("QDB_CLIENT_CONF")
})
}
}

func TestHttpPathologicalCasesFromConf(t *testing.T) {
testCases := []httpConfigTestCase{
{
Expand Down Expand Up @@ -148,6 +178,42 @@ func TestHttpPathologicalCasesFromConf(t *testing.T) {
}
}

func TestHttpPathologicalCasesFromEnv(t *testing.T) {
// Test a few cases just to make sure that the config is read
// from the env variable.
testCases := []httpConfigTestCase{
{
name: "basic_and_token_auth",
config: "http::username=test_user;token=test_token;",
expectedErr: "both basic and token",
},
{
name: "negative max_buf_size",
config: "http::max_buf_size=-1;",
expectedErr: "max buffer size is negative",
},
{
name: "schema is case-sensitive",
config: "hTtp::addr=localhost:1234;",
expectedErr: "invalid schema",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
os.Setenv("QDB_CLIENT_CONF", tc.config)
_, err := qdb.LineSenderFromEnv(context.Background())
assert.ErrorContains(t, err, tc.expectedErr)
os.Unsetenv("QDB_CLIENT_CONF")
})
}
}

func TestHttpEmptyEnvVariableCaseFromEnv(t *testing.T) {
_, err := qdb.LineSenderFromEnv(context.Background())
assert.ErrorContains(t, err, "QDB_CLIENT_CONF environment variable is not set")
}

func TestErrorWhenSenderTypeIsNotSpecified(t *testing.T) {
ctx := context.Background()

Expand Down
23 changes: 22 additions & 1 deletion sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"fmt"
"math/big"
"net/http"
"os"
"strings"
"time"
)

Expand All @@ -39,7 +41,9 @@ import (
// Each sender corresponds to a single client-server connection.
// A sender should not be called concurrently by multiple goroutines.
//
// HTTP senders also reuse connections from a global pool by default.
// HTTP senders reuse connections from a global pool by default. You can
// customize the HTTP transport by passing a custom http.Transport to the
// WithHttpTransport option.
type LineSender interface {
// Table sets the table name (metric) for a new ILP message. Should be
// called before any Symbol or Column method.
Expand Down Expand Up @@ -348,6 +352,7 @@ func WithTlsInsecureSkipVerify() LineSenderOption {
// WithHttpTransport sets the client's http transport to the
// passed pointer instead of the global transport. This can be
// used for customizing the http transport used by the LineSender.
// For example to set custom timeouts, TLS settings, etc.
// WithTlsInsecureSkipVerify is ignored when this option is in use.
//
// Only available for the HTTP sender.
Expand Down Expand Up @@ -389,6 +394,22 @@ func WithAutoFlushInterval(interval time.Duration) LineSenderOption {
}
}

// LineSenderFromEnv creates a LineSender with a config string defined by the QDB_CLIENT_CONF
// environment variable. See LineSenderFromConf for the config string format.
//
// This is a convenience method suitable for Cloud environments.
func LineSenderFromEnv(ctx context.Context) (LineSender, error) {
conf := strings.TrimSpace(os.Getenv("QDB_CLIENT_CONF"))
if conf == "" {
return nil, errors.New("QDB_CLIENT_CONF environment variable is not set")
}
c, err := confFromStr(conf)
if err != nil {
return nil, err
}
return newLineSender(ctx, c)
}

// LineSenderFromConf creates a LineSender using the QuestDB config string format.
//
// Example config string: "http::addr=localhost;username=joe;password=123;auto_flush_rows=1000;"
Expand Down
68 changes: 68 additions & 0 deletions tcp_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package questdb_test
import (
"context"
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -78,6 +79,72 @@ func TestTcpHappyCasesFromConf(t *testing.T) {
}
}

func TestTcpHappyCasesFromEnv(t *testing.T) {
var (
initBufSize = 4200
)

testServer, err := newTestTcpServer(readAndDiscard)
assert.NoError(t, err)
defer testServer.Close()

addr := testServer.Addr()

testCases := []tcpConfigTestCase{
{
name: "addr only",
config: fmt.Sprintf("tcp::addr=%s;", addr),
},
{
name: "init_buf_size",
config: fmt.Sprintf("tcp::addr=%s;init_buf_size=%d;",
addr, initBufSize),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
os.Setenv("QDB_CLIENT_CONF", tc.config)
sender, err := qdb.LineSenderFromEnv(context.Background())
assert.NoError(t, err)

sender.Close(context.Background())
os.Unsetenv("QDB_CLIENT_CONF")
})
}
}

func TestTcpPathologicalCasesFromEnv(t *testing.T) {
// Test a few cases just to make sure that the config is read
// from the env variable.
testCases := []tcpConfigTestCase{
{
name: "request_timeout",
config: "tcp::request_timeout=5;",
expectedErr: "requestTimeout setting is not available",
},
{
name: "min_throughput",
config: "tcp::min_throughput=5;",
expectedErr: "minThroughput setting is not available",
},
{
name: "auto_flush_rows",
config: "tcp::auto_flush_rows=5;",
expectedErr: "autoFlushRows setting is not available",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
os.Setenv("QDB_CLIENT_CONF", tc.config)
_, err := qdb.LineSenderFromEnv(context.Background())
assert.ErrorContains(t, err, tc.expectedErr)
os.Unsetenv("QDB_CLIENT_CONF")
})
}
}

func TestTcpPathologicalCasesFromConf(t *testing.T) {
testCases := []tcpConfigTestCase{
{
Expand Down Expand Up @@ -139,6 +206,7 @@ func TestTcpPathologicalCasesFromConf(t *testing.T) {
})
}
}

func TestErrorOnFlushWhenMessageIsPending(t *testing.T) {
ctx := context.Background()

Expand Down

0 comments on commit d282cfc

Please sign in to comment.