diff --git a/README.md b/README.md index f71fbc4..0bf043d 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,59 @@ To connect via TCP, set the configuration string to: // ... ``` +## Pooled Line Senders + +**Warning: Experimental feature designed for use with HTTP senders ONLY** + +Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism +to cache previously-used `LineSender`s in memory so they can be reused without +having to allocate and instantiate new senders. + +A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders +across multiple goroutines. + +Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire +a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred +execution block to Release the sender at the end of the goroutine. + +Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics: + +```go +package main + +import ( + "context" + + qdb "github.com/questdb/go-questdb-client/v3" +) + +func main() { + ctx := context.TODO() + + pool := qdb.PoolFromConf("http::addr=localhost:9000") + defer func() { + err := pool.Close(ctx) + if err != nil { + panic(err) + } + }() + + sender, err := pool.Acquire(ctx) + if err != nil { + panic(err) + } + + sender.Table("prices"). + Symbol("ticker", "AAPL"). + Float64Column("price", 123.45). + AtNow(ctx) + + if err := pool.Release(ctx, sender); err != nil { + panic(err) + } +} +``` + ## Migration from v2 v2 code example: diff --git a/sender_pool.go b/sender_pool.go new file mode 100644 index 0000000..2c80a75 --- /dev/null +++ b/sender_pool.go @@ -0,0 +1,179 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package questdb + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" +) + +// LineSenderPool wraps a mutex-protected slice of [LineSender]. It allows a goroutine to +// Acquire a sender from the pool and Release it back to the pool when it's done being used. +// +// WARNING: This is an experimental API that is designed to work with HTTP senders ONLY. +type LineSenderPool struct { + maxSenders int + conf string + + closed bool + + senders []LineSender + mu *sync.Mutex +} + +// LineSenderPoolOption defines line sender pool config option. +type LineSenderPoolOption func(*LineSenderPool) + +// PoolFromConf instantiates a new LineSenderPool with a QuestDB configuration string. +// Any sender acquired from this pool will be initialized with the same configuration +// string that was passed into the conf argument. +// +// The default maximum number of senders is 64, but can be customized by using the +// [WithMaxSenders] option. +func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, error) { + if strings.HasPrefix(conf, "tcp") { + return nil, errors.New("tcp/s not supported for pooled senders, use http/s only") + } + + pool := &LineSenderPool{ + maxSenders: 64, + conf: conf, + senders: []LineSender{}, + mu: &sync.Mutex{}, + } + + for _, opt := range opts { + opt(pool) + } + + return pool, nil +} + +// WithMaxSenders sets the maximum number of senders in the pool. +// The default maximum number of senders is 64. +func WithMaxSenders(count int) LineSenderPoolOption { + return func(lsp *LineSenderPool) { + lsp.maxSenders = count + } +} + +// Acquire obtains a LineSender from the pool. If the pool is empty, a new +// LineSender will be instantiated using the pool's config string. +func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return nil, fmt.Errorf("cannot Acquire a LineSender from a closed LineSenderPool") + } + + if len(p.senders) > 0 { + // Pop sender off the slice and return it + s := p.senders[len(p.senders)-1] + p.senders = p.senders[0 : len(p.senders)-1] + return s, nil + } + + return LineSenderFromConf(ctx, p.conf) +} + +// Release flushes the LineSender and returns it back to the pool. If the pool +// is full, the sender is closed and discarded. In cases where the sender's +// flush fails, it is not added back to the pool. +func (p *LineSenderPool) Release(ctx context.Context, s LineSender) error { + // If there is an error on flush, do not add the sender back to the pool + if err := s.Flush(ctx); err != nil { + return err + } + + p.mu.Lock() + defer p.mu.Unlock() + + for i := range p.senders { + if p.senders[i] == s { + return fmt.Errorf("LineSender %p has already been released back to the pool", s) + } + } + + if p.closed || len(p.senders) >= p.maxSenders { + return s.Close(ctx) + } + + p.senders = append(p.senders, s) + + return nil +} + +// Close sets the pool's status to "closed" and closes all cached LineSenders. +// When LineSenders are released back into a closed pool, they will be closed and discarded. +func (p *LineSenderPool) Close(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + p.closed = true + + var senderErrors []error + + for _, s := range p.senders { + senderErr := s.Close(ctx) + if senderErr != nil { + senderErrors = append(senderErrors, senderErr) + + } + } + + if len(senderErrors) == 0 { + return nil + } + + err := fmt.Errorf("error closing one or more LineSenders in the pool") + for _, senderErr := range senderErrors { + err = fmt.Errorf("%s %w", err, senderErr) + } + + return err +} + +// IsClosed will return true if the pool is closed. Once a pool is closed, +// you will not be able to Acquire any new LineSenders from it. When +// LineSenders are released back into a closed pool, they will be closed and +// discarded. +func (p *LineSenderPool) IsClosed() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.closed +} + +// Len returns the numbers of cached LineSenders in the pool. +func (p *LineSenderPool) Len() int { + p.mu.Lock() + defer p.mu.Unlock() + + return len(p.senders) +} diff --git a/sender_pool_test.go b/sender_pool_test.go new file mode 100644 index 0000000..4f37425 --- /dev/null +++ b/sender_pool_test.go @@ -0,0 +1,198 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ +package questdb_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/questdb/go-questdb-client/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBasicBehavior(t *testing.T) { + p, err := questdb.PoolFromConf("http::addr=localhost:1234") + require.NoError(t, err) + ctx := context.Background() + + // Start with an empty pool, allocate a new sender + s1, err := p.Acquire(ctx) + assert.NoError(t, err) + + // Release the sender and add it to the pool + assert.NoError(t, p.Release(ctx, s1)) + + // Acquiring a sender will return the initial one from the pool + s2, err := p.Acquire(ctx) + assert.NoError(t, err) + assert.Same(t, s1, s2) + + // Acquiring another sender will create a new one + s3, err := p.Acquire(ctx) + assert.NoError(t, err) + assert.NotSame(t, s1, s3) + + // Releasing the new sender will add it back to the pool + assert.NoError(t, p.Release(ctx, s3)) + + // Releasing the original sender will add it to the end of the pool slice + assert.NoError(t, p.Release(ctx, s2)) + + // Acquiring a new sender will pop the original one off the slice + s4, err := p.Acquire(ctx) + assert.NoError(t, err) + assert.Same(t, s1, s4) + + // Acquiring another sender will pop the second one off the slice + s5, err := p.Acquire(ctx) + assert.NoError(t, err) + assert.Same(t, s3, s5) +} + +func TestDoubleReleaseShouldFail(t *testing.T) { + p, err := questdb.PoolFromConf("http::addr=localhost:1234") + require.NoError(t, err) + + ctx := context.Background() + + // Start with an empty pool, allocate a new sender + s1, err := p.Acquire(ctx) + assert.NoError(t, err) + + // Release the sender + assert.NoError(t, p.Release(ctx, s1)) + + // Try to release the sender again. This should fail because it already exists in the slice + assert.Error(t, p.Release(ctx, s1)) +} + +func TestMaxPoolSize(t *testing.T) { + // Create a pool with 2 max senders + p, err := questdb.PoolFromConf("http::addr=localhost:1234", questdb.WithMaxSenders(2)) + require.NoError(t, err) + + ctx := context.Background() + + // Allocate 3 senders + s1, err := p.Acquire(ctx) + assert.NoError(t, err) + + s2, err := p.Acquire(ctx) + assert.NoError(t, err) + + s3, err := p.Acquire(ctx) + assert.NoError(t, err) + + // Release all senders in reverse order + // Internal slice will look like: [ s3 , s2 ] + assert.NoError(t, p.Release(ctx, s3)) + assert.NoError(t, p.Release(ctx, s2)) + assert.NoError(t, p.Release(ctx, s1)) + + // Acquire 3 more senders. + + // The first one will be s2 (senders get popped off the slice) + s, err := p.Acquire(ctx) + assert.NoError(t, err) + assert.Same(t, s, s2) + + // The next will be s3 + s, err = p.Acquire(ctx) + assert.NoError(t, err) + assert.Same(t, s, s3) + + // The final one will not be s1, s2, or s3 because the slice is empty + s, err = p.Acquire(ctx) + assert.NoError(t, err) + assert.NotSame(t, s, s1) + assert.NotSame(t, s, s2) + assert.NotSame(t, s, s3) +} + +func TestMultiThreadedPoolWritesOverHttp(t *testing.T) { + var ( + ctx = context.Background() + maxSenders = 2 + numThreads = 5 + ) + + srv, err := newTestHttpServer(sendToBackChannel) + assert.NoError(t, err) + defer srv.Close() + + wg := &sync.WaitGroup{} + + pool, err := questdb.PoolFromConf(fmt.Sprintf("http::addr=%s", srv.Addr()), questdb.WithMaxSenders(maxSenders)) + require.NoError(t, err) + + for i := 0; i < numThreads; i++ { + i := i + wg.Add(1) + go func() { + sender, err := pool.Acquire(ctx) + assert.NoError(t, err) + + sender.Table("test").Int64Column("thread", int64(i)).AtNow(ctx) + + assert.NoError(t, pool.Release(ctx, sender)) + + wg.Done() + }() + } + + wg.Wait() + + assert.NoError(t, pool.Close(ctx)) + + lines := []string{} + + go func() { + for { + select { + case msg := <-srv.BackCh: + lines = append(lines, msg) + case <-srv.closeCh: + return + default: + continue + } + } + }() + + assert.Eventually(t, func() bool { + return len(lines) == numThreads + }, time.Second, 100*time.Millisecond, "expected %d flushed lines but only received %d") +} + +func TestTcpNotSupported(t *testing.T) { + _, err := questdb.PoolFromConf("tcp::addr=localhost:9000") + assert.ErrorContains(t, err, "tcp/s not supported for pooled senders") + + _, err = questdb.PoolFromConf("tcps::addr=localhost:9000") + assert.ErrorContains(t, err, "tcp/s not supported for pooled senders") +}