From d282cfc093ccbd751c60cc3fe642757fb6284929 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 3 Jun 2024 12:16:14 +0200 Subject: [PATCH] feat(client): add LineSenderFromEnv method (#35) Also improves the docs around HTTP transport --------- Co-authored-by: Andrey Pechkurov --- README.md | 4 +++ http_sender_test.go | 66 +++++++++++++++++++++++++++++++++++++++++++ sender.go | 23 ++++++++++++++- tcp_sender_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 160 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c0bcf6d..37b53a3 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,10 @@ func main() { // Alternatively, you can use the LineSenderFromConf function: // sender, err := qdb.LineSenderFromConf(ctx, "http::addr=localhost:9000;") // ... + // or you can export the "http::addr=localhost:9000;" config string to + // the QDB_CLIENT_CONF environment variable and use the LineSenderFromEnv function: + // sender, err := qdb.LineSenderFromEnv(ctx) + // ... defer sender.Close(context.TODO()) // ... } diff --git a/http_sender_test.go b/http_sender_test.go index 2448c5a..1936676 100644 --- a/http_sender_test.go +++ b/http_sender_test.go @@ -29,6 +29,7 @@ import ( "errors" "fmt" "net/http" + "os" "testing" "time" @@ -91,6 +92,35 @@ func TestHttpHappyCasesFromConf(t *testing.T) { } } +func TestHttpHappyCasesFromEnv(t *testing.T) { + var ( + addr = "localhost:1111" + ) + + testCases := []httpConfigTestCase{ + { + name: "addr only", + config: fmt.Sprintf("http::addr=%s", addr), + }, + { + name: "auto flush", + config: fmt.Sprintf("http::addr=%s;auto_flush_rows=100;auto_flush_interval=1000;", + addr), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + os.Setenv("QDB_CLIENT_CONF", tc.config) + sender, err := qdb.LineSenderFromEnv(context.Background()) + assert.NoError(t, err) + + sender.Close(context.Background()) + os.Unsetenv("QDB_CLIENT_CONF") + }) + } +} + func TestHttpPathologicalCasesFromConf(t *testing.T) { testCases := []httpConfigTestCase{ { @@ -148,6 +178,42 @@ func TestHttpPathologicalCasesFromConf(t *testing.T) { } } +func TestHttpPathologicalCasesFromEnv(t *testing.T) { + // Test a few cases just to make sure that the config is read + // from the env variable. + testCases := []httpConfigTestCase{ + { + name: "basic_and_token_auth", + config: "http::username=test_user;token=test_token;", + expectedErr: "both basic and token", + }, + { + name: "negative max_buf_size", + config: "http::max_buf_size=-1;", + expectedErr: "max buffer size is negative", + }, + { + name: "schema is case-sensitive", + config: "hTtp::addr=localhost:1234;", + expectedErr: "invalid schema", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + os.Setenv("QDB_CLIENT_CONF", tc.config) + _, err := qdb.LineSenderFromEnv(context.Background()) + assert.ErrorContains(t, err, tc.expectedErr) + os.Unsetenv("QDB_CLIENT_CONF") + }) + } +} + +func TestHttpEmptyEnvVariableCaseFromEnv(t *testing.T) { + _, err := qdb.LineSenderFromEnv(context.Background()) + assert.ErrorContains(t, err, "QDB_CLIENT_CONF environment variable is not set") +} + func TestErrorWhenSenderTypeIsNotSpecified(t *testing.T) { ctx := context.Background() diff --git a/sender.go b/sender.go index 2a73431..2407aed 100644 --- a/sender.go +++ b/sender.go @@ -30,6 +30,8 @@ import ( "fmt" "math/big" "net/http" + "os" + "strings" "time" ) @@ -39,7 +41,9 @@ import ( // Each sender corresponds to a single client-server connection. // A sender should not be called concurrently by multiple goroutines. // -// HTTP senders also reuse connections from a global pool by default. +// HTTP senders reuse connections from a global pool by default. You can +// customize the HTTP transport by passing a custom http.Transport to the +// WithHttpTransport option. type LineSender interface { // Table sets the table name (metric) for a new ILP message. Should be // called before any Symbol or Column method. @@ -348,6 +352,7 @@ func WithTlsInsecureSkipVerify() LineSenderOption { // WithHttpTransport sets the client's http transport to the // passed pointer instead of the global transport. This can be // used for customizing the http transport used by the LineSender. +// For example to set custom timeouts, TLS settings, etc. // WithTlsInsecureSkipVerify is ignored when this option is in use. // // Only available for the HTTP sender. @@ -389,6 +394,22 @@ func WithAutoFlushInterval(interval time.Duration) LineSenderOption { } } +// LineSenderFromEnv creates a LineSender with a config string defined by the QDB_CLIENT_CONF +// environment variable. See LineSenderFromConf for the config string format. +// +// This is a convenience method suitable for Cloud environments. +func LineSenderFromEnv(ctx context.Context) (LineSender, error) { + conf := strings.TrimSpace(os.Getenv("QDB_CLIENT_CONF")) + if conf == "" { + return nil, errors.New("QDB_CLIENT_CONF environment variable is not set") + } + c, err := confFromStr(conf) + if err != nil { + return nil, err + } + return newLineSender(ctx, c) +} + // LineSenderFromConf creates a LineSender using the QuestDB config string format. // // Example config string: "http::addr=localhost;username=joe;password=123;auto_flush_rows=1000;" diff --git a/tcp_sender_test.go b/tcp_sender_test.go index 9fce744..6f938f6 100644 --- a/tcp_sender_test.go +++ b/tcp_sender_test.go @@ -27,6 +27,7 @@ package questdb_test import ( "context" "fmt" + "os" "testing" "time" @@ -78,6 +79,72 @@ func TestTcpHappyCasesFromConf(t *testing.T) { } } +func TestTcpHappyCasesFromEnv(t *testing.T) { + var ( + initBufSize = 4200 + ) + + testServer, err := newTestTcpServer(readAndDiscard) + assert.NoError(t, err) + defer testServer.Close() + + addr := testServer.Addr() + + testCases := []tcpConfigTestCase{ + { + name: "addr only", + config: fmt.Sprintf("tcp::addr=%s;", addr), + }, + { + name: "init_buf_size", + config: fmt.Sprintf("tcp::addr=%s;init_buf_size=%d;", + addr, initBufSize), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + os.Setenv("QDB_CLIENT_CONF", tc.config) + sender, err := qdb.LineSenderFromEnv(context.Background()) + assert.NoError(t, err) + + sender.Close(context.Background()) + os.Unsetenv("QDB_CLIENT_CONF") + }) + } +} + +func TestTcpPathologicalCasesFromEnv(t *testing.T) { + // Test a few cases just to make sure that the config is read + // from the env variable. + testCases := []tcpConfigTestCase{ + { + name: "request_timeout", + config: "tcp::request_timeout=5;", + expectedErr: "requestTimeout setting is not available", + }, + { + name: "min_throughput", + config: "tcp::min_throughput=5;", + expectedErr: "minThroughput setting is not available", + }, + { + name: "auto_flush_rows", + config: "tcp::auto_flush_rows=5;", + expectedErr: "autoFlushRows setting is not available", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + os.Setenv("QDB_CLIENT_CONF", tc.config) + _, err := qdb.LineSenderFromEnv(context.Background()) + assert.ErrorContains(t, err, tc.expectedErr) + os.Unsetenv("QDB_CLIENT_CONF") + }) + } +} + func TestTcpPathologicalCasesFromConf(t *testing.T) { testCases := []tcpConfigTestCase{ { @@ -139,6 +206,7 @@ func TestTcpPathologicalCasesFromConf(t *testing.T) { }) } } + func TestErrorOnFlushWhenMessageIsPending(t *testing.T) { ctx := context.Background()