From 9c507ee35e08065e48e05bef75e2a187f507b679 Mon Sep 17 00:00:00 2001 From: Thomas Rasch Date: Tue, 25 Apr 2023 14:56:32 +0200 Subject: [PATCH] Added PoolConfiguration.maxIdleConnections --- README.md | 5 +- .../PoolConfiguration.swift | 20 ++- .../PostgresConnectionPool.swift | 130 +++++++++++------- 3 files changed, 100 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 2f53f05..0f467ab 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ A simple connection pool on top of PostgresNIO. ```swift dependencies: [ - .package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.3.2"), + .package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.4.0"), ], targets: [ .target(name: "MyTarget", dependencies: [ @@ -38,7 +38,8 @@ let configuration = PoolConfiguration( connection: connection, connectTimeout: 10.0, queryTimeout: 60.0, - poolSize: 5) + poolSize: 5, + maxIdleConnections: 1) let pool = PostgresConnectionPool(configuration: configuration, logger: logger) // Fetch a connection from the pool and do something with it... diff --git a/Sources/PostgresConnectionPool/PoolConfiguration.swift b/Sources/PostgresConnectionPool/PoolConfiguration.swift index 50b331b..5b8e5cb 100644 --- a/Sources/PostgresConnectionPool/PoolConfiguration.swift +++ b/Sources/PostgresConnectionPool/PoolConfiguration.swift @@ -37,15 +37,19 @@ public struct PoolConfiguration { /// Connection parameters to the database. public let connection: Connection - /// Timeout for opening new connections to the PostgreSQL database, in seconds. + /// Timeout for opening new connections to the PostgreSQL database, in seconds (default: 5 seconds). public let connectTimeout: TimeInterval - /// TImeout for individual database queries, in seconds. - public let queryTimeout: TimeInterval + /// TImeout for individual database queries, in seconds (default: 10 seconds). + /// Can be disabled by setting to `nil`. + public let queryTimeout: TimeInterval? - /// The maximum number of open connections to the database. + /// The maximum number of open connections to the database (default: 10). public let poolSize: Int + /// The maximum number of idle connections (over a 60 seconds period). + public let maxIdleConnections: Int? + /// Called when new connections to the database are openend. /// /// Use this to set extra connection options or override the defaults. @@ -63,14 +67,16 @@ public struct PoolConfiguration { applicationName: String, connection: Connection, connectTimeout: TimeInterval = 5.0, - queryTimeout: TimeInterval = 10.0, - poolSize: Int = 10) + queryTimeout: TimeInterval? = 10.0, + poolSize: Int = 10, + maxIdleConnections: Int? = nil) { self.applicationName = applicationName self.connection = connection self.connectTimeout = connectTimeout.atLeast(1.0) - self.queryTimeout = queryTimeout.atLeast(1.0) + self.queryTimeout = queryTimeout?.atLeast(1.0) self.poolSize = poolSize.atLeast(1) + self.maxIdleConnections = maxIdleConnections?.atLeast(0) self.onReturnConnection = { connection, logger in try await connection.query("SELECT 1", logger: logger) diff --git a/Sources/PostgresConnectionPool/PostgresConnectionPool.swift b/Sources/PostgresConnectionPool/PostgresConnectionPool.swift index efe900a..8f113c9 100644 --- a/Sources/PostgresConnectionPool/PostgresConnectionPool.swift +++ b/Sources/PostgresConnectionPool/PostgresConnectionPool.swift @@ -10,6 +10,8 @@ import PostgresNIO public actor PostgresConnectionPool { private static let postgresMaxNameLength: Int = 32 // PostgreSQL allows 64 but we add some extra info + private static let healthCheckInterval: TimeInterval = 5.0 + private static let idleConnectionsCheckInterval: TimeInterval = 60.0 private let logger: Logger private let eventLoopGroup: EventLoopGroup @@ -18,7 +20,8 @@ public actor PostgresConnectionPool { private let connectionName: String private let poolName: String private let poolSize: Int - private let queryTimeout: TimeInterval + private let maxIdleConnections: Int? + private let queryTimeout: TimeInterval? private let onOpenConnection: ((PostgresConnection, Logger) async throws -> Void)? private let onReturnConnection: ((PostgresConnection, Logger) async throws -> Void)? @@ -27,6 +30,7 @@ public actor PostgresConnectionPool { private var connections: [PoolConnection] = [] private var available: Deque = [] private var continuations: Deque = [] + private var inUseConnectionCounts: Deque = [] private var didStartWatcherTask = false private var didShutdown = false @@ -46,6 +50,7 @@ public actor PostgresConnectionPool { self.connectionName = String(configuration.applicationName.replacingPattern("[^-\\w\\d\\s()]", with: "").prefix(PostgresConnectionPool.postgresMaxNameLength)) self.poolName = "\(configuration.connection.username)@\(configuration.connection.host):\(configuration.connection.port)/\(configuration.connection.database)" self.poolSize = configuration.poolSize + self.maxIdleConnections = configuration.maxIdleConnections self.queryTimeout = configuration.queryTimeout self.onOpenConnection = configuration.onOpenConnection @@ -63,9 +68,9 @@ public actor PostgresConnectionPool { self.postgresConfiguration = postgresConfiguration } - deinit { - assert(didShutdown, "Must call destroy() before releasing a PostgresConnectionPool") - } +// deinit { +// assert(didShutdown, "Must call shutdown() before releasing a PostgresConnectionPool") +// } /// Takes one connection from the pool and dishes it out to the caller. @discardableResult @@ -127,6 +132,9 @@ public actor PostgresConnectionPool { connection.state = .available available.append(connection) } + else { + assert(available.contains(connection)) + } Task.detached { [weak self] in await self?.handleNextContinuation() @@ -138,7 +146,7 @@ public actor PostgresConnectionPool { /// /// Must be done here since Swift doesn't yet allow async deinit. public func shutdown() async { - logger.debug("[\(poolName)] destroy()") + logger.debug("[\(poolName)] shutdown()") didShutdown = true @@ -146,28 +154,16 @@ public actor PostgresConnectionPool { for poolContinuation in continuations { poolContinuation.continuation.resume(throwing: PoolError.cancelled) } + continuations.removeAll() + + available.removeAll() // Close all open connections connections.forEach({ $0.state = .closed }) for poolConnection in connections { - if let connection = poolConnection.connection { - if let onCloseConnection = onCloseConnection { - do { - try await onCloseConnection(connection, logger) - } - catch { - logger.warning("[\(poolName)] onCloseConnection error: \(error)") - } - } - - do { - try await connection.close() - } - catch { - logger.warning("[\(poolName)] connection.close() error: \(error)") - } - } + await closeConnection(poolConnection) } + connections.removeAll() // Shut down the event loop. try? await eventLoopGroup.shutdownGracefully() @@ -175,10 +171,32 @@ public actor PostgresConnectionPool { // MARK: - Private + private func closeConnection(_ poolConnection: PoolConnection) async { + poolConnection.state = .closed + + guard let connection = poolConnection.connection else { return } + + if let onCloseConnection = onCloseConnection { + do { + try await onCloseConnection(connection, logger) + } + catch { + logger.warning("[\(poolName)] onCloseConnection error: \(error)") + } + } + + do { + try await connection.close() + } + catch { + logger.warning("[\(poolName)] connection.close() error: \(error)") + } + } + private func checkConnections() async { defer { Task.after( - seconds: 5.0, + seconds: PostgresConnectionPool.healthCheckInterval, priority: .low, operation: { [weak self] in await self?.checkConnections() @@ -192,6 +210,8 @@ public actor PostgresConnectionPool { // TODO: Kill self if too many stuck connections + await closeIdleConnections() + let usageCounter = connections.reduce(0) { $0 + $1.usageCounter } logger.debug("[\(poolName)] \(connections.count) connections (\(available.count) available, \(usageCounter) queries), \(continuations.count) continuations left") @@ -205,6 +225,35 @@ public actor PostgresConnectionPool { } } + // TODO: This doesn't work well with short bursts of activity that fall between the 5 seconds check interval + private func closeIdleConnections() async { + guard let maxIdleConnections else { return } + + // 60 seconds + let minArrayLength = Int(PostgresConnectionPool.idleConnectionsCheckInterval / PostgresConnectionPool.healthCheckInterval) + assert(minArrayLength >= 1, "idleConnectionsCheckInterval must be higher than healthCheckInterval") + if inUseConnectionCounts.count > minArrayLength { + inUseConnectionCounts.removeFirst() + } + inUseConnectionCounts.append(connections.count - available.count) + + guard continuations.isEmpty, + inUseConnectionCounts.count >= minArrayLength, + let maxInUse = inUseConnectionCounts.max() + else { return } + + let toClose = (available.count - maxIdleConnections) - maxInUse + guard toClose > 0 else { return } + + logger.debug("[\(poolName)] Closing \(toClose) idle connections") + + for _ in 1...toClose { + guard let poolConnection = available.popFirst() else { break } + + await closeConnection(poolConnection) + } + } + private func handleNextContinuation() async { guard continuations.isNotEmpty else { logger.debug("[\(poolName)] No more continuations left, \(connections.count) connections, \(available.count) available") @@ -237,39 +286,25 @@ public actor PostgresConnectionPool { catch { logger.warning("[\(poolName)] Health check for connection \(poolConnection.id) failed") - poolConnection.state = .closed - - if let connection = poolConnection.connection { - if let onCloseConnection = onCloseConnection { - do { - try await onCloseConnection(connection, logger) - } - catch { - logger.warning("[\(poolName)] onCloseConnection error: \(error)") - } - } - - do { - try await connection.close() - } - catch { - logger.warning("[\(poolName)] connection.close() error: \(error)") - } - } + await closeConnection(poolConnection) } } else { - poolConnection.state = .closed + await closeConnection(poolConnection) } } } + private func nameForConnection(id: Int) -> String { + "\(connectionName) - CONN:\(id)" + } + private func openConnection() async { if !didStartWatcherTask { didStartWatcherTask = true Task.after( - seconds: 5.0, + seconds: PostgresConnectionPool.healthCheckInterval, priority: .low, operation: { [weak self] in await self?.checkConnections() @@ -305,8 +340,11 @@ public actor PostgresConnectionPool { logger.debug("[\(poolName)] Connection \(poolConnection.id) established in \(connectionRuntime.rounded(toPlaces: 2))s") do { - try await connection.query(PostgresQuery(stringLiteral: "SET application_name='\(connectionName) - CONN:\(poolConnection.id)'"), logger: logger) - try await connection.query(PostgresQuery(stringLiteral: "SET statement_timeout=\(Int(queryTimeout * 1000))"), logger: logger) + try await connection.query(PostgresQuery(stringLiteral: "SET application_name='\(nameForConnection(id: poolConnection.id))'"), logger: logger) + + if let queryTimeout { + try await connection.query(PostgresQuery(stringLiteral: "SET statement_timeout=\(Int(queryTimeout * 1000))"), logger: logger) + } if let onOpenConnection = onOpenConnection { try await onOpenConnection(connection, logger)