From fa4d7bd7b59ecc935e0e6867dc7c46f5c7e3f9c8 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 12 Sep 2022 11:44:45 +0200 Subject: [PATCH] feat(client): add TimestampColumn() (#6) --- sender.go | 44 +++++++++++++++++++++---- sender_integration_test.go | 9 +++-- sender_test.go | 67 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 9 deletions(-) diff --git a/sender.go b/sender.go index da79443..603ee99 100644 --- a/sender.go +++ b/sender.go @@ -252,7 +252,7 @@ func (s *LineSender) Close() error { // called before any Symbol or Column method. // // Table name cannot contain any of the following characters: -// '\n', '\r', '?', ',', ''', '"', '\', '/', ':', ')', '(', '+', '*', +// '\n', '\r', '?', ',', ”', '"', '\', '/', ':', ')', '(', '+', '*', // '%', '~', starting '.', trailing '.', or a non-printable char. func (s *LineSender) Table(name string) *LineSender { if s.lastErr != nil { @@ -274,7 +274,7 @@ func (s *LineSender) Table(name string) *LineSender { // before any Column method. // // Symbol name cannot contain any of the following characters: -// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+', +// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. func (s *LineSender) Symbol(name, val string) *LineSender { if s.lastErr != nil { @@ -306,7 +306,7 @@ func (s *LineSender) Symbol(name, val string) *LineSender { // message. // // Column name cannot contain any of the following characters: -// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+', +// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. func (s *LineSender) Int64Column(name string, val int64) *LineSender { if !s.prepareForField(name) { @@ -323,11 +323,43 @@ func (s *LineSender) Int64Column(name string, val int64) *LineSender { return s } +// TimestampColumn adds a timestamp column value to the ILP +// message. Timestamp is Epoch microseconds. +// +// Negative timestamp value is not allowed and any attempt to +// set a negative value will cause an error to be returned on subsequent +// At() or AtNow() calls. +// +// Column name cannot contain any of the following characters: +// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+', +// '-', '*' '%%', '~', or a non-printable char. +func (s *LineSender) TimestampColumn(name string, ts int64) *LineSender { + if ts < 0 { + if s.lastErr != nil { + return s + } + s.lastErr = fmt.Errorf("timestamp cannot be negative: %d", ts) + return s + } + if !s.prepareForField(name) { + return s + } + s.lastErr = s.writeColumnName(name) + if s.lastErr != nil { + return s + } + s.buf.WriteByte('=') + s.buf.WriteInt(ts) + s.buf.WriteByte('t') + s.hasFields = true + return s +} + // Float64Column adds a 64-bit float (double) column value to the ILP // message. // // Column name cannot contain any of the following characters: -// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+', +// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. func (s *LineSender) Float64Column(name string, val float64) *LineSender { if !s.prepareForField(name) { @@ -346,7 +378,7 @@ func (s *LineSender) Float64Column(name string, val float64) *LineSender { // StringColumn adds a string column value to the ILP message. // // Column name cannot contain any of the following characters: -// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+', +// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. func (s *LineSender) StringColumn(name, val string) *LineSender { if !s.prepareForField(name) { @@ -370,7 +402,7 @@ func (s *LineSender) StringColumn(name, val string) *LineSender { // BoolColumn adds a boolean column value to the ILP message. // // Column name cannot contain any of the following characters: -// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+', +// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. func (s *LineSender) BoolColumn(name string, val bool) *LineSender { if !s.prepareForField(name) { diff --git a/sender_integration_test.go b/sender_integration_test.go index 697eb8f..9292917 100644 --- a/sender_integration_test.go +++ b/sender_integration_test.go @@ -108,7 +108,7 @@ func setupQuestDB0(ctx context.Context, auth ilpAuthType, setupProxy bool) (*que return nil, err } req := testcontainers.ContainerRequest{ - Image: "questdb/questdb:6.4.1", + Image: "questdb/questdb:6.5.1", ExposedPorts: []string{"9000/tcp", "9009/tcp"}, WaitingFor: wait.ForHTTP("/").WithPort("9000"), Networks: []string{networkName}, @@ -238,6 +238,7 @@ func TestE2EValidWrites(t *testing.T) { Int64Column("long_col", 12). StringColumn("str_col", "foobar"). BoolColumn("bool_col", true). + TimestampColumn("timestamp_col", 42). At(ctx, 1000) if err != nil { return err @@ -250,6 +251,7 @@ func TestE2EValidWrites(t *testing.T) { Int64Column("long_col", 11). StringColumn("str_col", "barbaz"). BoolColumn("bool_col", false). + TimestampColumn("timestamp_col", 43). At(ctx, 2000) }, tableData{ @@ -259,11 +261,12 @@ func TestE2EValidWrites(t *testing.T) { {"long_col", "LONG"}, {"str_col", "STRING"}, {"bool_col", "BOOLEAN"}, + {"timestamp_col", "TIMESTAMP"}, {"timestamp", "TIMESTAMP"}, }, Dataset: [][]interface{}{ - {"test_ilp1", float64(12.2), float64(12), "foobar", true, "1970-01-01T00:00:00.000001Z"}, - {"test_ilp2", float64(11.2), float64(11), "barbaz", false, "1970-01-01T00:00:00.000002Z"}, + {"test_ilp1", float64(12.2), float64(12), "foobar", true, "1970-01-01T00:00:00.000042Z", "1970-01-01T00:00:00.000001Z"}, + {"test_ilp2", float64(11.2), float64(11), "barbaz", false, "1970-01-01T00:00:00.000043Z", "1970-01-01T00:00:00.000002Z"}, }, Count: 2, }, diff --git a/sender_test.go b/sender_test.go index fc17905..6264612 100644 --- a/sender_test.go +++ b/sender_test.go @@ -110,6 +110,42 @@ func TestValidWrites(t *testing.T) { } } +func TestTimestampSerialization(t *testing.T) { + ctx := context.Background() + + testCases := []struct { + name string + val int64 + }{ + {"max value", math.MaxInt64}, + {"zero", 0}, + {"small positive value", 10}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + srv, err := newTestServer(sendToBackChannel) + assert.NoError(t, err) + + sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr)) + assert.NoError(t, err) + + err = sender.Table(testTable).TimestampColumn("a_col", tc.val).AtNow(ctx) + assert.NoError(t, err) + + err = sender.Flush(ctx) + assert.NoError(t, err) + + sender.Close() + + // Now check what was received by the server. + expectLines(t, srv.backCh, []string{"my_test_table a_col=" + strconv.FormatInt(tc.val, 10) + "t"}) + + srv.close() + }) + } +} + func TestInt64Serialization(t *testing.T) { ctx := context.Background() @@ -288,6 +324,12 @@ func TestErrorOnMissingTableCall(t *testing.T) { return s.Float64Column("float", 4.2).AtNow(ctx) }, }, + { + "timestamp column", + func(s *qdb.LineSender) error { + return s.TimestampColumn("timestamp", 42).AtNow(ctx) + }, + }, } for _, tc := range testCases { @@ -326,6 +368,23 @@ func TestErrorOnMultipleTableCalls(t *testing.T) { assert.Empty(t, sender.Messages()) } +func TestErrorOnNegativeTimestamp(t *testing.T) { + ctx := context.Background() + + srv, err := newTestServer(readAndDiscard) + assert.NoError(t, err) + defer srv.close() + + sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr)) + assert.NoError(t, err) + defer sender.Close() + + err = sender.Table(testTable).TimestampColumn("timestamp_col", -42).AtNow(ctx) + + assert.ErrorContains(t, err, "timestamp cannot be negative: -42") + assert.Empty(t, sender.Messages()) +} + func TestErrorOnSymbolCallAfterColumn(t *testing.T) { ctx := context.Background() @@ -357,6 +416,12 @@ func TestErrorOnSymbolCallAfterColumn(t *testing.T) { return s.Table("awesome_table").Float64Column("float", 4.2).Symbol("sym", "abc").AtNow(ctx) }, }, + { + "timestamp column", + func(s *qdb.LineSender) error { + return s.Table("awesome_table").TimestampColumn("timestamp", 42).Symbol("sym", "abc").AtNow(ctx) + }, + }, } for _, tc := range testCases { @@ -483,6 +548,7 @@ func BenchmarkLineSenderBatch1000(b *testing.B) { Int64Column("long_col", int64(i)). StringColumn("str_col", "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua"). BoolColumn("bool_col", true). + TimestampColumn("timestamp_col", 42). At(ctx, int64(1000*i)) } sender.Flush(ctx) @@ -509,6 +575,7 @@ func BenchmarkLineSenderNoFlush(b *testing.B) { Int64Column("long_col", int64(i)). StringColumn("str_col", "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua"). BoolColumn("bool_col", true). + TimestampColumn("timestamp_col", 42). At(ctx, int64(1000*i)) } sender.Flush(ctx)