Skip to content

Commit

Permalink
feat(client): add TimestampColumn() (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot authored Sep 12, 2022
1 parent b761335 commit fa4d7bd
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 9 deletions.
44 changes: 38 additions & 6 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (s *LineSender) Close() error {
// called before any Symbol or Column method.
//
// Table name cannot contain any of the following characters:
// '\n', '\r', '?', ',', ''', '"', '\', '/', ':', ')', '(', '+', '*',
// '\n', '\r', '?', ',', ', '"', '\', '/', ':', ')', '(', '+', '*',
// '%', '~', starting '.', trailing '.', or a non-printable char.
func (s *LineSender) Table(name string) *LineSender {
if s.lastErr != nil {
Expand All @@ -274,7 +274,7 @@ func (s *LineSender) Table(name string) *LineSender {
// before any Column method.
//
// Symbol name cannot contain any of the following characters:
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) Symbol(name, val string) *LineSender {
if s.lastErr != nil {
Expand Down Expand Up @@ -306,7 +306,7 @@ func (s *LineSender) Symbol(name, val string) *LineSender {
// message.
//
// Column name cannot contain any of the following characters:
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) Int64Column(name string, val int64) *LineSender {
if !s.prepareForField(name) {
Expand All @@ -323,11 +323,43 @@ func (s *LineSender) Int64Column(name string, val int64) *LineSender {
return s
}

// TimestampColumn adds a timestamp column value to the ILP
// message. Timestamp is Epoch microseconds.
//
// Negative timestamp value is not allowed and any attempt to
// set a negative value will cause an error to be returned on subsequent
// At() or AtNow() calls.
//
// Column name cannot contain any of the following characters:
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) TimestampColumn(name string, ts int64) *LineSender {
if ts < 0 {
if s.lastErr != nil {
return s
}
s.lastErr = fmt.Errorf("timestamp cannot be negative: %d", ts)
return s
}
if !s.prepareForField(name) {
return s
}
s.lastErr = s.writeColumnName(name)
if s.lastErr != nil {
return s
}
s.buf.WriteByte('=')
s.buf.WriteInt(ts)
s.buf.WriteByte('t')
s.hasFields = true
return s
}

// Float64Column adds a 64-bit float (double) column value to the ILP
// message.
//
// Column name cannot contain any of the following characters:
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) Float64Column(name string, val float64) *LineSender {
if !s.prepareForField(name) {
Expand All @@ -346,7 +378,7 @@ func (s *LineSender) Float64Column(name string, val float64) *LineSender {
// StringColumn adds a string column value to the ILP message.
//
// Column name cannot contain any of the following characters:
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) StringColumn(name, val string) *LineSender {
if !s.prepareForField(name) {
Expand All @@ -370,7 +402,7 @@ func (s *LineSender) StringColumn(name, val string) *LineSender {
// BoolColumn adds a boolean column value to the ILP message.
//
// Column name cannot contain any of the following characters:
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) BoolColumn(name string, val bool) *LineSender {
if !s.prepareForField(name) {
Expand Down
9 changes: 6 additions & 3 deletions sender_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func setupQuestDB0(ctx context.Context, auth ilpAuthType, setupProxy bool) (*que
return nil, err
}
req := testcontainers.ContainerRequest{
Image: "questdb/questdb:6.4.1",
Image: "questdb/questdb:6.5.1",
ExposedPorts: []string{"9000/tcp", "9009/tcp"},
WaitingFor: wait.ForHTTP("/").WithPort("9000"),
Networks: []string{networkName},
Expand Down Expand Up @@ -238,6 +238,7 @@ func TestE2EValidWrites(t *testing.T) {
Int64Column("long_col", 12).
StringColumn("str_col", "foobar").
BoolColumn("bool_col", true).
TimestampColumn("timestamp_col", 42).
At(ctx, 1000)
if err != nil {
return err
Expand All @@ -250,6 +251,7 @@ func TestE2EValidWrites(t *testing.T) {
Int64Column("long_col", 11).
StringColumn("str_col", "barbaz").
BoolColumn("bool_col", false).
TimestampColumn("timestamp_col", 43).
At(ctx, 2000)
},
tableData{
Expand All @@ -259,11 +261,12 @@ func TestE2EValidWrites(t *testing.T) {
{"long_col", "LONG"},
{"str_col", "STRING"},
{"bool_col", "BOOLEAN"},
{"timestamp_col", "TIMESTAMP"},
{"timestamp", "TIMESTAMP"},
},
Dataset: [][]interface{}{
{"test_ilp1", float64(12.2), float64(12), "foobar", true, "1970-01-01T00:00:00.000001Z"},
{"test_ilp2", float64(11.2), float64(11), "barbaz", false, "1970-01-01T00:00:00.000002Z"},
{"test_ilp1", float64(12.2), float64(12), "foobar", true, "1970-01-01T00:00:00.000042Z", "1970-01-01T00:00:00.000001Z"},
{"test_ilp2", float64(11.2), float64(11), "barbaz", false, "1970-01-01T00:00:00.000043Z", "1970-01-01T00:00:00.000002Z"},
},
Count: 2,
},
Expand Down
67 changes: 67 additions & 0 deletions sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,42 @@ func TestValidWrites(t *testing.T) {
}
}

func TestTimestampSerialization(t *testing.T) {
ctx := context.Background()

testCases := []struct {
name string
val int64
}{
{"max value", math.MaxInt64},
{"zero", 0},
{"small positive value", 10},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
srv, err := newTestServer(sendToBackChannel)
assert.NoError(t, err)

sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr))
assert.NoError(t, err)

err = sender.Table(testTable).TimestampColumn("a_col", tc.val).AtNow(ctx)
assert.NoError(t, err)

err = sender.Flush(ctx)
assert.NoError(t, err)

sender.Close()

// Now check what was received by the server.
expectLines(t, srv.backCh, []string{"my_test_table a_col=" + strconv.FormatInt(tc.val, 10) + "t"})

srv.close()
})
}
}

func TestInt64Serialization(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -288,6 +324,12 @@ func TestErrorOnMissingTableCall(t *testing.T) {
return s.Float64Column("float", 4.2).AtNow(ctx)
},
},
{
"timestamp column",
func(s *qdb.LineSender) error {
return s.TimestampColumn("timestamp", 42).AtNow(ctx)
},
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -326,6 +368,23 @@ func TestErrorOnMultipleTableCalls(t *testing.T) {
assert.Empty(t, sender.Messages())
}

func TestErrorOnNegativeTimestamp(t *testing.T) {
ctx := context.Background()

srv, err := newTestServer(readAndDiscard)
assert.NoError(t, err)
defer srv.close()

sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr))
assert.NoError(t, err)
defer sender.Close()

err = sender.Table(testTable).TimestampColumn("timestamp_col", -42).AtNow(ctx)

assert.ErrorContains(t, err, "timestamp cannot be negative: -42")
assert.Empty(t, sender.Messages())
}

func TestErrorOnSymbolCallAfterColumn(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -357,6 +416,12 @@ func TestErrorOnSymbolCallAfterColumn(t *testing.T) {
return s.Table("awesome_table").Float64Column("float", 4.2).Symbol("sym", "abc").AtNow(ctx)
},
},
{
"timestamp column",
func(s *qdb.LineSender) error {
return s.Table("awesome_table").TimestampColumn("timestamp", 42).Symbol("sym", "abc").AtNow(ctx)
},
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -483,6 +548,7 @@ func BenchmarkLineSenderBatch1000(b *testing.B) {
Int64Column("long_col", int64(i)).
StringColumn("str_col", "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua").
BoolColumn("bool_col", true).
TimestampColumn("timestamp_col", 42).
At(ctx, int64(1000*i))
}
sender.Flush(ctx)
Expand All @@ -509,6 +575,7 @@ func BenchmarkLineSenderNoFlush(b *testing.B) {
Int64Column("long_col", int64(i)).
StringColumn("str_col", "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua").
BoolColumn("bool_col", true).
TimestampColumn("timestamp_col", 42).
At(ctx, int64(1000*i))
}
sender.Flush(ctx)
Expand Down

0 comments on commit fa4d7bd

Please sign in to comment.