From 9c27421601ff6d7bdb9766b1baff0a27dc03e66e Mon Sep 17 00:00:00 2001 From: Andrei Pechkurov <37772591+puzpuzpuz@users.noreply.github.com> Date: Wed, 14 Aug 2024 17:26:52 +0300 Subject: [PATCH] feat(client): blocking LineSenderPool (#53) Breaking change of an experimental API The pool now blocks if there are already too many senders in use. Pooled senders are released implicitly, via the Close call. --- README.md | 48 +++++++--- export_test.go | 4 + http_sender.go | 9 +- http_sender_test.go | 2 +- sender.go | 7 ++ sender_pool.go | 216 +++++++++++++++++++++++++++++++++++--------- sender_pool_test.go | 128 ++++++++++++++++++-------- tcp_sender.go | 13 ++- tcp_sender_test.go | 17 ++++ 9 files changed, 334 insertions(+), 110 deletions(-) diff --git a/README.md b/README.md index 0bf043d..f027c43 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Golang client for QuestDB's [Influx Line Protocol](https://questdb.io/docs/refer The library requires Go 1.19 or newer. Features: -* Context-aware API. +* [Context](https://www.digitalocean.com/community/tutorials/how-to-use-contexts-in-go)-aware API. * Optimized for batch writes. * Supports TLS encryption and ILP authentication. * Automatic write retries and connection reuse for ILP over HTTP. @@ -43,23 +43,40 @@ func main() { } // Make sure to close the sender on exit to release resources. defer sender.Close(ctx) + // Send a few ILP messages. + tradedTs, err := time.Parse(time.RFC3339, "2022-08-06T15:04:05.123456Z") + if err != nil { + log.Fatal(err) + } err = sender. - Table("trades"). - Symbol("name", "test_ilp1"). - Float64Column("value", 12.4). - AtNow(ctx) + Table("trades_go"). + Symbol("pair", "USDGBP"). + Symbol("type", "buy"). + Float64Column("traded_price", 0.83). + Float64Column("limit_price", 0.84). + Int64Column("qty", 100). + At(ctx, tradedTs) + if err != nil { + log.Fatal(err) + } + + tradedTs, err = time.Parse(time.RFC3339, "2022-08-06T15:04:06.987654Z") if err != nil { log.Fatal(err) } err = sender. - Table("trades"). - Symbol("name", "test_ilp2"). - Float64Column("value", 11.4). - At(ctx, time.Now().UnixNano()) + Table("trades_go"). + Symbol("pair", "GBPJPY"). + Symbol("type", "sell"). + Float64Column("traded_price", 135.97). + Float64Column("limit_price", 0.84). + Int64Column("qty", 400). + At(ctx, tradedTs) if err != nil { log.Fatal(err) } + // Make sure that the messages are sent over the network. err = sender.Flush(ctx) if err != nil { @@ -80,15 +97,15 @@ To connect via TCP, set the configuration string to: **Warning: Experimental feature designed for use with HTTP senders ONLY** Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism -to cache previously-used `LineSender`s in memory so they can be reused without -having to allocate and instantiate new senders. +to pool previously-used `LineSender`s so they can be reused without having +to allocate and instantiate new senders. -A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders +A LineSenderPool is thread-safe and can be used to concurrently obtain senders across multiple goroutines. Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred -execution block to Release the sender at the end of the goroutine. +execution block to Close the sender at the end of the goroutine. Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics: @@ -112,7 +129,7 @@ func main() { } }() - sender, err := pool.Acquire(ctx) + sender, err := pool.Sender(ctx) if err != nil { panic(err) } @@ -122,7 +139,8 @@ func main() { Float64Column("price", 123.45). AtNow(ctx) - if err := pool.Release(ctx, sender); err != nil { + // Close call returns the sender back to the pool + if err := sender.Close(ctx); err != nil { panic(err) } } diff --git a/export_test.go b/export_test.go index b38a27f..eebd655 100644 --- a/export_test.go +++ b/export_test.go @@ -64,6 +64,10 @@ func Messages(s LineSender) string { } func MsgCount(s LineSender) int { + if ps, ok := s.(*pooledSender); ok { + hs, _ := ps.wrapped.(*httpLineSender) + return hs.MsgCount() + } if hs, ok := s.(*httpLineSender); ok { return hs.MsgCount() } diff --git a/http_sender.go b/http_sender.go index 0e86230..2142961 100644 --- a/http_sender.go +++ b/http_sender.go @@ -29,7 +29,6 @@ import ( "context" "crypto/tls" "encoding/json" - "errors" "fmt" "io" "math/big" @@ -176,7 +175,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error { ) if s.closed { - return errors.New("cannot flush a closed LineSender") + return errClosedSenderFlush } err := s.buf.LastErr() @@ -187,7 +186,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error { } if s.buf.HasTable() { s.buf.DiscardPendingMsg() - return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush") + return errFlushWithPendingMessage } if s.buf.msgCount == 0 { @@ -285,7 +284,7 @@ func (s *httpLineSender) BoolColumn(name string, val bool) LineSender { func (s *httpLineSender) Close(ctx context.Context) error { if s.closed { - return nil + return errDoubleSenderClose } var err error @@ -309,7 +308,7 @@ func (s *httpLineSender) AtNow(ctx context.Context) error { func (s *httpLineSender) At(ctx context.Context, ts time.Time) error { if s.closed { - return errors.New("cannot queue new messages on a closed LineSender") + return errClosedSenderAt } sendTs := true diff --git a/http_sender_test.go b/http_sender_test.go index a11be5d..67d12ad 100644 --- a/http_sender_test.go +++ b/http_sender_test.go @@ -601,7 +601,7 @@ func TestSenderDoubleClose(t *testing.T) { assert.NoError(t, err) err = sender.Close(ctx) - assert.NoError(t, err) + assert.Error(t, err) } func TestErrorOnFlushWhenSenderIsClosed(t *testing.T) { diff --git a/sender.go b/sender.go index d7ac5c4..d28f271 100644 --- a/sender.go +++ b/sender.go @@ -35,6 +35,13 @@ import ( "time" ) +var ( + errClosedSenderFlush = errors.New("cannot flush a closed LineSender") + errFlushWithPendingMessage = errors.New("pending ILP message must be finalized with At or AtNow before calling Flush") + errClosedSenderAt = errors.New("cannot queue new messages on a closed LineSender") + errDoubleSenderClose = errors.New("double sender close") +) + // LineSender allows you to insert rows into QuestDB by sending ILP // messages over HTTP or TCP protocol. // diff --git a/sender_pool.go b/sender_pool.go index 08dde79..a59a81a 100644 --- a/sender_pool.go +++ b/sender_pool.go @@ -28,8 +28,17 @@ import ( "context" "errors" "fmt" + "math/big" "strings" "sync" + "sync/atomic" + "time" +) + +var ( + errAcquireFromClosedPool = errors.New("cannot acquire a LineSender from a closed LineSenderPool") + errHttpOnlySender = errors.New("tcp/s not supported for pooled senders, use http/s only") + errPooledSenderClose = errors.New("error closing one or more LineSenders in the pool") ) // LineSenderPool wraps a mutex-protected slice of [LineSender]. It allows a goroutine to @@ -37,19 +46,25 @@ import ( // // WARNING: This is an experimental API that is designed to work with HTTP senders ONLY. type LineSenderPool struct { - // options - maxSenders int + maxSenders int // the only option + numSenders int // number of used and free senders // presence of a non-empty conf takes precedence over opts conf string opts []LineSenderOption - // senders are stored here - senders []LineSender + freeSenders []*pooledSender - // plumbing fields closed bool mu *sync.Mutex + cond sync.Cond // used to wake up free sender waiters +} + +type pooledSender struct { + pool *LineSenderPool + wrapped LineSender + dirty bool // set to true if any of the sender calls returned an error + tick uint64 // even values stand for free sender, odd values mean in-use sender } // LineSenderPoolOption defines line sender pool config option. @@ -63,15 +78,16 @@ type LineSenderPoolOption func(*LineSenderPool) // [WithMaxSenders] option. func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, error) { if strings.HasPrefix(conf, "tcp") { - return nil, errors.New("tcp/s not supported for pooled senders, use http/s only") + return nil, errHttpOnlySender } pool := &LineSenderPool{ - maxSenders: 64, - conf: conf, - senders: []LineSender{}, - mu: &sync.Mutex{}, + maxSenders: 64, + conf: conf, + freeSenders: make([]*pooledSender, 0, 64), + mu: &sync.Mutex{}, } + pool.cond = *sync.NewCond(pool.mu) for _, opt := range opts { opt(pool) @@ -102,11 +118,12 @@ func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, e // WithMaxSenders(32)(p) func PoolFromOptions(opts ...LineSenderOption) (*LineSenderPool, error) { pool := &LineSenderPool{ - maxSenders: 64, - opts: opts, - senders: []LineSender{}, - mu: &sync.Mutex{}, + maxSenders: 64, + opts: opts, + freeSenders: make([]*pooledSender, 0, 64), + mu: &sync.Mutex{}, } + pool.cond = *sync.NewCond(pool.mu) return pool, nil } @@ -119,62 +136,97 @@ func WithMaxSenders(count int) LineSenderPoolOption { } } -// Acquire obtains a LineSender from the pool. If the pool is empty, a new +// Sender obtains a LineSender from the pool. If the pool is empty, a new // LineSender will be instantiated using the pool's config string. -func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) { +// If there is already maximum number of senders obtained from the pool, +// this call will block until one of the senders is returned back to +// the pool by calling sender.Close(). +func (p *LineSenderPool) Sender(ctx context.Context) (LineSender, error) { + var ( + s LineSender + err error + ) + p.mu.Lock() defer p.mu.Unlock() if p.closed { - return nil, fmt.Errorf("cannot Acquire a LineSender from a closed LineSenderPool") + return nil, errAcquireFromClosedPool + } + + // We may have to wait for a free sender + for len(p.freeSenders) == 0 && p.numSenders == p.maxSenders { + p.cond.Wait() } - if len(p.senders) > 0 { + if p.closed { + return nil, errAcquireFromClosedPool + } + + if len(p.freeSenders) > 0 { // Pop sender off the slice and return it - s := p.senders[len(p.senders)-1] - p.senders = p.senders[0 : len(p.senders)-1] + s := p.freeSenders[len(p.freeSenders)-1] + atomic.AddUint64(&s.tick, 1) + p.freeSenders = p.freeSenders[0 : len(p.freeSenders)-1] return s, nil } if p.conf != "" { - return LineSenderFromConf(ctx, p.conf) + s, err = LineSenderFromConf(ctx, p.conf) } else { conf := newLineSenderConfig(httpSenderType) for _, opt := range p.opts { opt(conf) if conf.senderType == tcpSenderType { - return nil, errors.New("tcp/s not supported for pooled senders, use http/s only") + return nil, errHttpOnlySender } } - return newHttpLineSender(conf) + s, err = newHttpLineSender(conf) } + if err != nil { + return nil, err + } + + p.numSenders++ + + ps := &pooledSender{ + pool: p, + wrapped: s, + } + atomic.AddUint64(&ps.tick, 1) + return ps, nil } -// Release flushes the LineSender and returns it back to the pool. If the pool -// is full, the sender is closed and discarded. In cases where the sender's -// flush fails, it is not added back to the pool. -func (p *LineSenderPool) Release(ctx context.Context, s LineSender) error { - // If there is an error on flush, do not add the sender back to the pool - if err := s.Flush(ctx); err != nil { - return err +func (p *LineSenderPool) free(ctx context.Context, ps *pooledSender) error { + var flushErr error + + if !ps.dirty { + flushErr = ps.Flush(ctx) } p.mu.Lock() defer p.mu.Unlock() - - for i := range p.senders { - if p.senders[i] == s { - return fmt.Errorf("LineSender %p has already been released back to the pool", s) + // Notify free sender waiters, if any + defer p.cond.Broadcast() + + if flushErr != nil { + // Failed to flush, close and call it a day + p.numSenders-- + closeErr := ps.wrapped.Close(ctx) + if closeErr != nil { + return fmt.Errorf("%s %w", flushErr, closeErr) } + return flushErr } - if p.closed || len(p.senders) >= p.maxSenders { - return s.Close(ctx) + if ps.dirty || p.closed { + // Previous error or closed pool, close and call it a day + p.numSenders-- + return ps.wrapped.Close(ctx) } - p.senders = append(p.senders, s) - + p.freeSenders = append(p.freeSenders, ps) return nil } @@ -184,23 +236,28 @@ func (p *LineSenderPool) Close(ctx context.Context) error { p.mu.Lock() defer p.mu.Unlock() + if p.closed { + // Already closed + return nil + } p.closed = true var senderErrors []error - for _, s := range p.senders { - senderErr := s.Close(ctx) + for _, ps := range p.freeSenders { + senderErr := ps.wrapped.Close(ctx) if senderErr != nil { senderErrors = append(senderErrors, senderErr) - } } + p.numSenders -= len(p.freeSenders) + p.freeSenders = []*pooledSender{} if len(senderErrors) == 0 { return nil } - err := fmt.Errorf("error closing one or more LineSenders in the pool") + err := errPooledSenderClose for _, senderErr := range senderErrors { err = fmt.Errorf("%s %w", err, senderErr) } @@ -219,10 +276,81 @@ func (p *LineSenderPool) IsClosed() bool { return p.closed } -// Len returns the numbers of cached LineSenders in the pool. +// Len returns the number of LineSenders in the pool. func (p *LineSenderPool) Len() int { p.mu.Lock() defer p.mu.Unlock() - return len(p.senders) + return p.numSenders +} + +func (ps *pooledSender) Table(name string) LineSender { + ps.wrapped.Table(name) + return ps +} + +func (ps *pooledSender) Symbol(name, val string) LineSender { + ps.wrapped.Symbol(name, val) + return ps +} + +func (ps *pooledSender) Int64Column(name string, val int64) LineSender { + ps.wrapped.Int64Column(name, val) + return ps +} + +func (ps *pooledSender) Long256Column(name string, val *big.Int) LineSender { + ps.wrapped.Long256Column(name, val) + return ps +} + +func (ps *pooledSender) TimestampColumn(name string, ts time.Time) LineSender { + ps.wrapped.TimestampColumn(name, ts) + return ps +} + +func (ps *pooledSender) Float64Column(name string, val float64) LineSender { + ps.wrapped.Float64Column(name, val) + return ps +} + +func (ps *pooledSender) StringColumn(name, val string) LineSender { + ps.wrapped.StringColumn(name, val) + return ps +} + +func (ps *pooledSender) BoolColumn(name string, val bool) LineSender { + ps.wrapped.BoolColumn(name, val) + return ps +} + +func (ps *pooledSender) AtNow(ctx context.Context) error { + err := ps.wrapped.AtNow(ctx) + if err != nil { + ps.dirty = true + } + return err +} + +func (ps *pooledSender) At(ctx context.Context, ts time.Time) error { + err := ps.wrapped.At(ctx, ts) + if err != nil { + ps.dirty = true + } + return err +} + +func (ps *pooledSender) Flush(ctx context.Context) error { + err := ps.wrapped.Flush(ctx) + if err != nil { + ps.dirty = true + } + return err +} + +func (ps *pooledSender) Close(ctx context.Context) error { + if atomic.AddUint64(&ps.tick, 1)&1 == 1 { + return errDoubleSenderClose + } + return ps.pool.free(ctx, ps) } diff --git a/sender_pool_test.go b/sender_pool_test.go index 4f37425..9cea0b2 100644 --- a/sender_pool_test.go +++ b/sender_pool_test.go @@ -27,111 +27,163 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" - "github.com/questdb/go-questdb-client/v3" + qdb "github.com/questdb/go-questdb-client/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestBasicBehavior(t *testing.T) { - p, err := questdb.PoolFromConf("http::addr=localhost:1234") + p, err := qdb.PoolFromConf("http::addr=localhost:1234") require.NoError(t, err) ctx := context.Background() // Start with an empty pool, allocate a new sender - s1, err := p.Acquire(ctx) + s1, err := p.Sender(ctx) assert.NoError(t, err) // Release the sender and add it to the pool - assert.NoError(t, p.Release(ctx, s1)) + assert.NoError(t, s1.Close(ctx)) // Acquiring a sender will return the initial one from the pool - s2, err := p.Acquire(ctx) + s2, err := p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s1, s2) // Acquiring another sender will create a new one - s3, err := p.Acquire(ctx) + s3, err := p.Sender(ctx) assert.NoError(t, err) assert.NotSame(t, s1, s3) // Releasing the new sender will add it back to the pool - assert.NoError(t, p.Release(ctx, s3)) + assert.NoError(t, s3.Close(ctx)) // Releasing the original sender will add it to the end of the pool slice - assert.NoError(t, p.Release(ctx, s2)) + assert.NoError(t, s2.Close(ctx)) // Acquiring a new sender will pop the original one off the slice - s4, err := p.Acquire(ctx) + s4, err := p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s1, s4) // Acquiring another sender will pop the second one off the slice - s5, err := p.Acquire(ctx) + s5, err := p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s3, s5) } -func TestDoubleReleaseShouldFail(t *testing.T) { - p, err := questdb.PoolFromConf("http::addr=localhost:1234") +func TestFlushOnClose(t *testing.T) { + ctx := context.Background() + + srv, err := newTestHttpServer(readAndDiscard) + assert.NoError(t, err) + defer srv.Close() + + p, err := qdb.PoolFromOptions( + qdb.WithHttp(), + qdb.WithAddress(srv.Addr()), + qdb.WithAutoFlushDisabled(), + ) + assert.NoError(t, err) + defer p.Close(ctx) + + s, err := p.Sender(ctx) + assert.NoError(t, err) + + err = s.Table(testTable).StringColumn("bar", "baz").AtNow(ctx) + assert.NoError(t, err) + + assert.Equal(t, 1, qdb.MsgCount(s)) + + assert.NoError(t, s.Close(ctx)) + + assert.Equal(t, 0, qdb.MsgCount(s)) +} + +func TestPooledSenderDoubleClose(t *testing.T) { + p, err := qdb.PoolFromConf("http::addr=localhost:1234") require.NoError(t, err) ctx := context.Background() // Start with an empty pool, allocate a new sender - s1, err := p.Acquire(ctx) + s1, err := p.Sender(ctx) assert.NoError(t, err) // Release the sender - assert.NoError(t, p.Release(ctx, s1)) + assert.NoError(t, s1.Close(ctx)) - // Try to release the sender again. This should fail because it already exists in the slice - assert.Error(t, p.Release(ctx, s1)) + // Try to release the sender again. This should fail + assert.Error(t, s1.Close(ctx)) } func TestMaxPoolSize(t *testing.T) { // Create a pool with 2 max senders - p, err := questdb.PoolFromConf("http::addr=localhost:1234", questdb.WithMaxSenders(2)) + p, err := qdb.PoolFromConf("http::addr=localhost:1234", qdb.WithMaxSenders(3)) require.NoError(t, err) ctx := context.Background() // Allocate 3 senders - s1, err := p.Acquire(ctx) + s1, err := p.Sender(ctx) assert.NoError(t, err) - s2, err := p.Acquire(ctx) + s2, err := p.Sender(ctx) assert.NoError(t, err) - s3, err := p.Acquire(ctx) + s3, err := p.Sender(ctx) assert.NoError(t, err) // Release all senders in reverse order // Internal slice will look like: [ s3 , s2 ] - assert.NoError(t, p.Release(ctx, s3)) - assert.NoError(t, p.Release(ctx, s2)) - assert.NoError(t, p.Release(ctx, s1)) + assert.NoError(t, s3.Close(ctx)) + assert.NoError(t, s2.Close(ctx)) + assert.NoError(t, s1.Close(ctx)) // Acquire 3 more senders. - // The first one will be s2 (senders get popped off the slice) - s, err := p.Acquire(ctx) + // The first one will be s1 (senders get popped off the slice) + s, err := p.Sender(ctx) + assert.NoError(t, err) + assert.Same(t, s, s1) + + // The next will be s2 + s, err = p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s, s2) - // The next will be s3 - s, err = p.Acquire(ctx) + // The final one will s3 + s, err = p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s, s3) - // The final one will not be s1, s2, or s3 because the slice is empty - s, err = p.Acquire(ctx) - assert.NoError(t, err) - assert.NotSame(t, s, s1) - assert.NotSame(t, s, s2) - assert.NotSame(t, s, s3) + // Now verify the Sender caller gets blocked until a sender is freed + successFlag := int64(0) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + s, err := p.Sender(ctx) + assert.NoError(t, err) + atomic.AddInt64(&successFlag, 1) + assert.Same(t, s, s3) + assert.NoError(t, s.Close(ctx)) + wg.Done() + }() + + assert.Equal(t, atomic.LoadInt64(&successFlag), int64(0)) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, atomic.LoadInt64(&successFlag), int64(0)) + + assert.NoError(t, s3.Close(ctx)) + wg.Wait() + assert.Equal(t, atomic.LoadInt64(&successFlag), int64(1)) + + assert.NoError(t, s2.Close(ctx)) + assert.NoError(t, s1.Close(ctx)) + assert.NoError(t, p.Close(ctx)) } func TestMultiThreadedPoolWritesOverHttp(t *testing.T) { @@ -147,19 +199,19 @@ func TestMultiThreadedPoolWritesOverHttp(t *testing.T) { wg := &sync.WaitGroup{} - pool, err := questdb.PoolFromConf(fmt.Sprintf("http::addr=%s", srv.Addr()), questdb.WithMaxSenders(maxSenders)) + pool, err := qdb.PoolFromConf(fmt.Sprintf("http::addr=%s", srv.Addr()), qdb.WithMaxSenders(maxSenders)) require.NoError(t, err) for i := 0; i < numThreads; i++ { i := i wg.Add(1) go func() { - sender, err := pool.Acquire(ctx) + sender, err := pool.Sender(ctx) assert.NoError(t, err) sender.Table("test").Int64Column("thread", int64(i)).AtNow(ctx) - assert.NoError(t, pool.Release(ctx, sender)) + assert.NoError(t, sender.Close(ctx)) wg.Done() }() @@ -190,9 +242,9 @@ func TestMultiThreadedPoolWritesOverHttp(t *testing.T) { } func TestTcpNotSupported(t *testing.T) { - _, err := questdb.PoolFromConf("tcp::addr=localhost:9000") + _, err := qdb.PoolFromConf("tcp::addr=localhost:9000") assert.ErrorContains(t, err, "tcp/s not supported for pooled senders") - _, err = questdb.PoolFromConf("tcps::addr=localhost:9000") + _, err = qdb.PoolFromConf("tcps::addr=localhost:9000") assert.ErrorContains(t, err, "tcp/s not supported for pooled senders") } diff --git a/tcp_sender.go b/tcp_sender.go index 8fde732..d4bf86d 100644 --- a/tcp_sender.go +++ b/tcp_sender.go @@ -33,7 +33,6 @@ import ( "crypto/rand" "crypto/tls" "encoding/base64" - "errors" "fmt" "math/big" "net" @@ -136,12 +135,12 @@ func newTcpLineSender(ctx context.Context, conf *lineSenderConfig) (*tcpLineSend } func (s *tcpLineSender) Close(_ context.Context) error { - if s.conn != nil { - conn := s.conn - s.conn = nil - return conn.Close() + if s.conn == nil { + return errDoubleSenderClose } - return nil + conn := s.conn + s.conn = nil + return conn.Close() } func (s *tcpLineSender) Table(name string) LineSender { @@ -193,7 +192,7 @@ func (s *tcpLineSender) Flush(ctx context.Context) error { } if s.buf.HasTable() { s.buf.DiscardPendingMsg() - return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush") + return errFlushWithPendingMessage } if err = ctx.Err(); err != nil { diff --git a/tcp_sender_test.go b/tcp_sender_test.go index b528d26..d738d93 100644 --- a/tcp_sender_test.go +++ b/tcp_sender_test.go @@ -207,6 +207,23 @@ func TestTcpPathologicalCasesFromConf(t *testing.T) { } } +func TestTcpSenderDoubleClose(t *testing.T) { + ctx := context.Background() + + srv, err := newTestTcpServer(readAndDiscard) + assert.NoError(t, err) + defer srv.Close() + + sender, err := qdb.NewLineSender(ctx, qdb.WithTcp(), qdb.WithAddress(srv.Addr())) + assert.NoError(t, err) + + err = sender.Close(ctx) + assert.NoError(t, err) + + err = sender.Close(ctx) + assert.Error(t, err) +} + func TestErrorOnFlushWhenMessageIsPending(t *testing.T) { ctx := context.Background()