From 53a2d35445526db127746d8946357d2eedd1c0e0 Mon Sep 17 00:00:00 2001 From: Steven Sklar Date: Mon, 12 Aug 2024 09:42:07 -0400 Subject: [PATCH] feat(sender): add PoolFromOptions pool constructor (#52) --- sender_pool.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/sender_pool.go b/sender_pool.go index 2c80a75..08dde79 100644 --- a/sender_pool.go +++ b/sender_pool.go @@ -37,13 +37,19 @@ import ( // // WARNING: This is an experimental API that is designed to work with HTTP senders ONLY. type LineSenderPool struct { + // options maxSenders int - conf string - closed bool + // presence of a non-empty conf takes precedence over opts + conf string + opts []LineSenderOption + // senders are stored here senders []LineSender - mu *sync.Mutex + + // plumbing fields + closed bool + mu *sync.Mutex } // LineSenderPoolOption defines line sender pool config option. @@ -74,6 +80,37 @@ func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, e return pool, nil } +// PoolFromOptions instantiates a new LineSenderPool using programmatic options. +// Any sender acquired from this pool will be initialized with the same options +// that were passed into the opts argument. +// +// Unlike [PoolFromConf], PoolFromOptions does not have the ability to customize +// the returned LineSenderPool. In this case, to add options (such as [WithMaxSenders]), +// you need manually apply these options after calling this method. +// +// // Create a PoolFromOptions with LineSender options +// p, err := PoolFromOptions( +// WithHttp(), +// WithAutoFlushRows(1000000), +// ) +// +// if err != nil { +// panic(err) +// } +// +// // Add Pool-level options manually +// WithMaxSenders(32)(p) +func PoolFromOptions(opts ...LineSenderOption) (*LineSenderPool, error) { + pool := &LineSenderPool{ + maxSenders: 64, + opts: opts, + senders: []LineSender{}, + mu: &sync.Mutex{}, + } + + return pool, nil +} + // WithMaxSenders sets the maximum number of senders in the pool. // The default maximum number of senders is 64. func WithMaxSenders(count int) LineSenderPoolOption { @@ -99,7 +136,19 @@ func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) { return s, nil } - return LineSenderFromConf(ctx, p.conf) + if p.conf != "" { + return LineSenderFromConf(ctx, p.conf) + } else { + conf := newLineSenderConfig(httpSenderType) + for _, opt := range p.opts { + opt(conf) + if conf.senderType == tcpSenderType { + return nil, errors.New("tcp/s not supported for pooled senders, use http/s only") + } + } + return newHttpLineSender(conf) + } + } // Release flushes the LineSender and returns it back to the pool. If the pool