diff --git a/.golangci.yml b/.golangci.yml index 4021052..acd3b9c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -21,6 +21,7 @@ linters-settings: - ifElseChain - octalLiteral - whyNoLint + - commentedOutCode gocyclo: min-complexity: 15 gofmt: diff --git a/utility/amqp/connection.go b/utility/amqp/connection.go index 1f559a4..fadfd31 100644 --- a/utility/amqp/connection.go +++ b/utility/amqp/connection.go @@ -2,191 +2,91 @@ package amqp import ( "context" - "time" + "sync" - "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/os/gtime" "github.com/rabbitmq/amqp091-go" ) -// IConnectionController 连接控制器接口 -type IConnectionController interface { - GetConnection() *amqp091.Connection // 获取原始连接 - Reconnect() error // 重新连接 - IsClosed() bool // 是否已经关闭 - IsError() bool // 是否有错误 - GetError() error // 获取错误 - Close() error // 关闭连接 - EstablishedTime() *gtime.Time // 获取连接建立时间 - IsIdle() bool // 检查连接是否空闲 - SetIdleTimeout(timeout time.Duration) // 设置空闲超时时间 - SetMaxRetries(retries int) // 设置重连最大尝试次数 - ResetIdleTime() // 重置空闲时间 - NotifyError(receiver chan error) <-chan error // 获取通知的管道 -} - -type ConnectionControllerInput struct { - URI string // AMQP URI - AMQPConfig *amqp091.Config - maxRetries int - Ctx context.Context -} - -type ConnectionController struct { - conn *amqp091.Connection - err error - noChan bool - errorReceivers []chan error - maxRetries int - initRetryWaitTime time.Duration - maxRetryWaitTime time.Duration - retryWaitTime time.Duration - Ctx context.Context -} - -func NewConnectionController(in *ConnectionControllerInput) IConnectionController { - var ( - conn *amqp091.Connection - err error - ) - if in == nil { - conn, err = nil, gerror.New("ConnectionControllerInput is nil") - } else if in.AMQPConfig != nil { - conn, err = amqp091.DialConfig(in.URI, *in.AMQPConfig) - } else { - conn, err = amqp091.Dial(in.URI) +type connectionController struct { + id string // 连接唯一标识 + conn *amqp091.Connection + pool Pool + lastUsed *gtime.Time + isUsed bool + mu sync.Mutex +} + +func NewConnectionController(conn *amqp091.Connection, pool Pool) ConnectionController { + controller := &connectionController{ + id: gtime.TimestampNanoStr(), + conn: conn, + pool: pool, + lastUsed: gtime.Now(), + isUsed: false, + mu: sync.Mutex{}, } - connectionController := &ConnectionController{ - conn: conn, - err: err, - noChan: false, - maxRetries: in.maxRetries, - } - go connectionController.StartMonitor() - return connectionController + controller.registerCloseHandler() + return controller } -func (c *ConnectionController) commitError(err error) { - c.err = err - if c.noChan || len(c.errorReceivers) == 0 { - return - } - for _, receiver := range c.errorReceivers { - receiver <- err +func (c *connectionController) GetConnection() (*amqp091.Connection, error) { + c.mu.Lock() + defer c.mu.Unlock() + if c.IsClosed() { + return nil, gerror.New("connection is closed") } + c.lastUsed = gtime.Now() + c.isUsed = true + return c.conn, nil } -// StartMonitor 启动连接的守护核心 -func (c *ConnectionController) StartMonitor() { - if c.conn == nil || c.err != nil { +func (c *connectionController) ReleaseConnection() { + c.mu.Lock() + defer c.mu.Unlock() + if c.IsClosed() { return } - go c.keepAlive() // 自动重连 - go c.monitorExceedIdleTimeoutAndMaxConnectionLiveliness() // 超时销毁,空闲销毁 + c.isUsed = false + c.lastUsed = gtime.Now() + _ = c.pool.(*pool).releaseConnectionController(c) } -func (c *ConnectionController) getRetryPolicy() time.Duration { - if c.retryWaitTime >= c.maxRetryWaitTime { - return c.maxRetryWaitTime - } - t, n := c.retryWaitTime, c.retryWaitTime*2 // double the wait time - if n > c.maxRetryWaitTime { - c.retryWaitTime = c.maxRetryWaitTime - } else { - c.retryWaitTime = n +func (c *connectionController) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + if c.IsClosed() { + return nil } - return t + err := c.conn.Close() + c.conn = nil + return err } -func (c *ConnectionController) resetRetryPolicy() { - c.retryWaitTime = c.initRetryWaitTime +func (c *connectionController) IsClosed() bool { + return c.conn == nil || c.conn.IsClosed() } -func (c *ConnectionController) keepAlive() { +func (c *connectionController) registerCloseHandler() { + ctx := context.Background() for err := range c.conn.NotifyClose(make(chan *amqp091.Error)) { - g.Log().Errorf(c.Ctx, "[RabbitMQ] AMQP 连接丢失,错误信息: %s", err.Error()) - maxRetries := c.maxRetries - for i := 1; i <= maxRetries; i++ { //nolint:staticcheck - retryPolicy := c.getRetryPolicy() - g.Log().Debugf(c.Ctx, "[RabbitMQ] AMQP 将在等待 %s 后,尝试第 %d 次重连……", retryPolicy.String(), i) - time.Sleep(retryPolicy) - e := c.Reconnect() - if e == nil { - g.Log().Debugf(c.Ctx, "[RabbitMQ] AMQP 重连成功") - c.resetRetryPolicy() - continue - } else { - g.Log().Errorf(c.Ctx, "[RabbitMQ] AMQP 重连失败,错误信息: %s", e.Error()) - } - if i == c.maxRetries { - g.Log().Errorf(c.Ctx, "[RabbitMQ] AMQP 重连失败,已达最大重连次数") - c.commitError(e) - break - } + if err != nil { + g.Log().Debugf(ctx, "amqp connection[%s] closed: %s", c.id, err.Error()) + c.conn = nil + _ = c.pool.(*pool).removeConnectionController(c) + return } } } -func (c *ConnectionController) monitorExceedIdleTimeoutAndMaxConnectionLiveliness() {} - -func (c *ConnectionController) NotifyError(receiver chan error) <-chan error { - // TODO implement me - panic("implement me") -} - -func (c *ConnectionController) Reconnect() error { - // TODO implement me - panic("implement me") -} - -func (c *ConnectionController) IsClosed() bool { - if c.conn == nil { - return true +func dial(config *ConnectionConfig) (*amqp091.Connection, error) { + if config.config != nil { + return amqp091.DialConfig(config.url, *config.config) + } else { + return amqp091.Dial(config.url) } - return c.conn.IsClosed() -} - -func (c *ConnectionController) GetError() error { - // TODO implement me - panic("implement me") -} - -func (c *ConnectionController) Close() error { - // TODO implement me - panic("implement me") -} - -func (c *ConnectionController) EstablishedTime() *gtime.Time { - // TODO implement me - panic("implement me") -} - -func (c *ConnectionController) IsIdle() bool { - // TODO implement me - panic("implement me") -} - -func (c *ConnectionController) SetIdleTimeout(timeout time.Duration) { - // TODO implement me - panic("implement me") -} - -func (c *ConnectionController) SetMaxRetries(retries int) { - // TODO implement me - panic("implement me") -} - -func (c *ConnectionController) ResetIdleTime() { - // TODO implement me - panic("implement me") -} - -func (c *ConnectionController) GetConnection() *amqp091.Connection { - return c.conn -} - -func (c *ConnectionController) IsError() bool { - return c.err != nil } diff --git a/utility/amqp/pool.go b/utility/amqp/pool.go index 0e8d02a..dc2fd1b 100644 --- a/utility/amqp/pool.go +++ b/utility/amqp/pool.go @@ -1 +1,220 @@ package amqp + +import ( + "sync" + "time" + + "github.com/gogf/gf/v2/errors/gerror" + + "github.com/gogf/gf/v2/os/gtime" +) + +type PoolConfig struct { + MaxOpenConnections int // 最大连接数 + MaxLifetime time.Duration // 连接最大生命周期 + IdleTimeout time.Duration // 空闲连接超时时间 + MaxIdleConnections int // 最大空闲连接数 + MinIdleConnections int // 最小空闲连接数 + Strategy PoolStrategy // 连接池返回策略 + CheckInterval time.Duration // 健康检查间隔 + WaitInterval time.Duration // 等待间隔 + WaitingTimeout time.Duration // 最大等待时间 +} + +type pool struct { + config *PoolConfig + connectionConfig *ConnectionConfig + container ConnectionContainer + size int // 当前连接池大小 + exited bool + mu sync.Mutex +} + +func DefaultPoolConfig() *PoolConfig { + return &PoolConfig{ + MaxOpenConnections: 100, + MaxLifetime: time.Minute * 5, + IdleTimeout: time.Second * 30, + MaxIdleConnections: 10, + MinIdleConnections: 5, + Strategy: PoolStrategyDefault, + CheckInterval: time.Second * 5, // 5s 检查一次 + WaitInterval: 20 * time.Millisecond, + WaitingTimeout: 200 * time.Millisecond, + } +} + +func NewPool(config *PoolConfig, connectionConfig *ConnectionConfig) (Pool, error) { + if config == nil || connectionConfig == nil { + return nil, gerror.New("config or connection config is nil") + } + p := &pool{ + config: config, + connectionConfig: connectionConfig, + container: NewPoolContainerQueueWithCap(config.MaxOpenConnections), + size: 0, + exited: false, + mu: sync.Mutex{}, + } + var err error + for i := 0; i < config.MinIdleConnections; i++ { + var controller ConnectionController + controller, err = p.newConnectionController() + if err != nil { + break + } + _ = p.container.Push(controller) + } + if err != nil { + return nil, err + } + go p.healthCheck() + return p, nil +} + +func (p *pool) GetConnectionController() (ConnectionController, error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.container.Len() == 0 { + if p.size < p.config.MaxOpenConnections { + return p.newConnectionController() + } + waitStart := gtime.Now() + for p.container.Len() == 0 { + if p.config.WaitingTimeout > 0 && waitStart.Add(p.config.WaitingTimeout).Before(gtime.Now()) { + return nil, gerror.New("failed to acquire connection from pool: waiting timeout") + } + time.Sleep(p.config.WaitInterval) + } + } + conn, err := p.container.Pop() + if err != nil { + return nil, err + } + return conn, nil +} + +func (p *pool) SetMaxOpenConnections(maxOpenConnections int) { + p.config.MaxOpenConnections = maxOpenConnections +} + +func (p *pool) SetMaxLifetime(maxLifetime time.Duration) { + p.config.MaxLifetime = maxLifetime +} + +func (p *pool) SetMaxIdleConnections(maxIdleConnections int) { + p.config.MaxIdleConnections = maxIdleConnections +} + +func (p *pool) SetMinIdleConnections(minIdleConnections int) { + p.config.MinIdleConnections = minIdleConnections +} + +func (p *pool) SetIdleTimeout(idleTimeout time.Duration) { + p.config.IdleTimeout = idleTimeout +} + +func (p *pool) Close() error { + err := p.container.Close() + p.exited = true + return err +} + +func (p *pool) IsExisted() bool { + return p.exited +} + +// shouldDoRemove 判断是否需要移除连接 +func (p *pool) shouldDoRemove(cc ConnectionController) bool { + if cc.IsClosed() { + return true + } + if p.config.MaxLifetime > 0 && cc.(*connectionController).lastUsed.Add(p.config.MaxLifetime).Before(gtime.Now()) { + return true + } + if p.config.MaxIdleConnections > 0 && p.container.Len() > p.config.MaxIdleConnections { + return true + } + if p.config.IdleTimeout > 0 && cc.(*connectionController).lastUsed.Add(p.config.IdleTimeout).Before(gtime.Now()) { + return true + } + return false +} + +func (p *pool) releaseConnectionController(controller ConnectionController) error { + cc := controller.(*connectionController) + cc.mu.Lock() + defer cc.mu.Unlock() + if !cc.IsClosed() { + if p.shouldDoRemove(cc) { + err := cc.Close() + if err != nil { + return err + } + p.size-- + return nil + } + return p.container.Push(cc) + } + return nil +} + +func (p *pool) Len() int { + return p.size +} + +func (p *pool) newConnectionController() (ConnectionController, error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.size >= p.config.MaxOpenConnections { + return nil, gerror.New("connection pool is full") + } + conn, err := dial(p.connectionConfig) + if err != nil { + return nil, err + } + p.size++ + return NewConnectionController(conn, p), nil +} + +// healthCheck 健康检查 +func (p *pool) healthCheck() { + for { + p.mu.Lock() + if p.IsExisted() { + return + } + if p.size < p.config.MinIdleConnections { // 维护最低连接数 + for i := 0; i < p.config.MinIdleConnections-p.size; i++ { + conn, err := p.newConnectionController() + if err != nil { + break + } + _ = p.container.Push(conn) + } + } + if p.config.MaxIdleConnections > 0 && p.container.Len() > p.config.MaxIdleConnections { // 维护最大空闲连接数 + for i := 0; i < p.container.Len()-p.config.MaxIdleConnections; i++ { + conn, err := p.container.Pop() + if err != nil { + break + } + _ = conn.Close() + p.size-- + } + } + p.mu.Unlock() + time.Sleep(p.config.CheckInterval) + } +} + +func (p *pool) removeConnectionController(cc ConnectionController) error { + p.mu.Lock() + defer p.mu.Unlock() + err := p.container.Remove(cc) + if err != nil { + return err + } + p.size-- + return nil +} diff --git a/utility/amqp/pool_container_queue.go b/utility/amqp/pool_container_queue.go new file mode 100644 index 0000000..65458ca --- /dev/null +++ b/utility/amqp/pool_container_queue.go @@ -0,0 +1,86 @@ +package amqp + +import ( + "sync" + + "github.com/gogf/gf/v2/errors/gerror" +) + +// PoolContainerQueue 使用 channel 实现队列 +type PoolContainerQueue struct { + connections chan ConnectionController + mu sync.Mutex +} + +func NewPoolContainerQueueWithCap(capacity int) *PoolContainerQueue { + return &PoolContainerQueue{ + connections: make(chan ConnectionController, capacity), + } +} + +func NewPoolContainerQueue() *PoolContainerQueue { + return &PoolContainerQueue{ + connections: make(chan ConnectionController), + } +} + +func (p *PoolContainerQueue) Pop() (ConnectionController, error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.Len() == 0 { + return nil, gerror.New("connection pool is empty") + } + return <-p.connections, nil +} + +func (p *PoolContainerQueue) Push(c ConnectionController) error { + p.mu.Lock() + defer p.mu.Unlock() + p.connections <- c + return nil +} + +func (p *PoolContainerQueue) Len() int { + return len(p.connections) +} + +func (p *PoolContainerQueue) Remove(c ConnectionController) error { + p.mu.Lock() + defer p.mu.Unlock() + lens := p.Len() + for i := 0; i < lens; i++ { + conn, _ := p.Pop() + if conn.(*connectionController).id == c.(*connectionController).id { + return nil // 不需要再放回队列 + } + _ = p.Push(conn) // 重新放回队列 + } + return gerror.New("connection not found") +} + +func (p *PoolContainerQueue) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + lens := p.Len() + for i := 0; i < lens; i++ { + conn, _ := p.Pop() + _ = conn.Close() + } + close(p.connections) + return nil +} + +func (p *PoolContainerQueue) ReCap(newCap int) error { + p.mu.Lock() + defer p.mu.Unlock() + lens := p.Len() + // 重新分配容量 + newChan := make(chan ConnectionController, newCap) + for i := 0; i < lens; i++ { + conn, _ := p.Pop() + newChan <- conn + } + close(p.connections) + p.connections = newChan + return nil +} diff --git a/utility/amqp/type.go b/utility/amqp/type.go new file mode 100644 index 0000000..5ba463b --- /dev/null +++ b/utility/amqp/type.go @@ -0,0 +1,58 @@ +package amqp + +import ( + "time" + + "github.com/rabbitmq/amqp091-go" +) + +type PoolStrategy int + +const ( + PoolStrategyLRU PoolStrategy = iota // 最近最少使用 + PoolStrategyDefault = PoolStrategyLRU +) + +// ------------------------------------------------------------ +// +// Pool Definition +// +// ------------------------------------------------------------ + +type Pool interface { + GetConnectionController() (ConnectionController, error) + SetMaxOpenConnections(maxOpenConnections int) // 设置最大连接数 + SetMaxLifetime(maxLifetime time.Duration) // 设置连接最大生命周期 + SetMaxIdleConnections(maxIdleConnections int) // 设置最大空闲连接数 + SetMinIdleConnections(minIdleConnections int) // 设置最小空闲连接数 + SetIdleTimeout(idleTimeout time.Duration) // 设置空闲连接超时时间 + Len() int // 获取连接池大小 + Close() error // 关闭连接池 + IsExisted() bool // 判断连接池是否已经关闭 +} + +type ConnectionContainer interface { + Pop() (ConnectionController, error) // 弹出一个连接 + Push(ConnectionController) error // 推入一个连接 + Len() int // 获取连接池长度 + Remove(ConnectionController) error // 移除一个连接 + Close() error // 销毁整个容器 +} + +// ------------------------------------------------------------ +// +// Connections Definition +// +// ------------------------------------------------------------ + +type ConnectionController interface { + GetConnection() (*amqp091.Connection, error) + ReleaseConnection() + Close() error + IsClosed() bool +} + +type ConnectionConfig struct { + config *amqp091.Config + url string +}