From 62899537809de37035a5b2ed477d33b212a5504d Mon Sep 17 00:00:00 2001 From: Thomas Rasch Date: Thu, 11 May 2023 11:40:41 +0200 Subject: [PATCH 1/2] #5: Improved error handling when the pool was shutdown forcibly --- README.md | 2 +- .../Extensions/DoubleExtensions.swift | 1 + .../Extensions/EmptyTestable.swift | 2 + .../Extensions/FloatingPointExtensions.swift | 4 +- .../Extensions/IntExtensions.swift | 1 + .../Extensions/PSQLError+Description.swift | 9 +- .../Extensions/StringExtensions.swift | 2 + .../Extensions/TaskExtensions.swift | 1 + .../PostgresConnectionPool/PoolError.swift | 63 +++++++++++--- Sources/PostgresConnectionPool/PoolInfo.swift | 7 ++ .../PostgresConnectionPool.swift | 85 +++++++++++-------- .../ConnectionErrorTests.swift | 39 +++++++-- 12 files changed, 154 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index bf5613f..81bd492 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ This package requires Swift 5.7 or higher (at least Xcode 13), and compiles on m ```swift dependencies: [ - .package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.5.5"), + .package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.6.1"), ], targets: [ .target(name: "MyTarget", dependencies: [ diff --git a/Sources/PostgresConnectionPool/Extensions/DoubleExtensions.swift b/Sources/PostgresConnectionPool/Extensions/DoubleExtensions.swift index 364b1cc..b8e2c36 100644 --- a/Sources/PostgresConnectionPool/Extensions/DoubleExtensions.swift +++ b/Sources/PostgresConnectionPool/Extensions/DoubleExtensions.swift @@ -6,6 +6,7 @@ import Foundation extension Double { + /// Returns the maximum of `self` and the other value. func atLeast(_ minValue: Double) -> Double { Swift.max(minValue, self) } diff --git a/Sources/PostgresConnectionPool/Extensions/EmptyTestable.swift b/Sources/PostgresConnectionPool/Extensions/EmptyTestable.swift index d493e46..874dae1 100644 --- a/Sources/PostgresConnectionPool/Extensions/EmptyTestable.swift +++ b/Sources/PostgresConnectionPool/Extensions/EmptyTestable.swift @@ -10,6 +10,8 @@ import Foundation protocol EmptyTestable { var isEmpty: Bool { get } + + /// A Boolean value indicating whether the collection is **not** empty. var isNotEmpty: Bool { get } } diff --git a/Sources/PostgresConnectionPool/Extensions/FloatingPointExtensions.swift b/Sources/PostgresConnectionPool/Extensions/FloatingPointExtensions.swift index b28e9c9..3c50ec7 100644 --- a/Sources/PostgresConnectionPool/Extensions/FloatingPointExtensions.swift +++ b/Sources/PostgresConnectionPool/Extensions/FloatingPointExtensions.swift @@ -7,7 +7,7 @@ import Foundation extension FloatingPoint { - /// Returns rounded FloatingPoint to specified number of places + /// Returns rounded FloatingPoint to specified number of places. func rounded(toPlaces places: Int) -> Self { guard places >= 0 else { return self } var divisor: Self = 1 @@ -15,7 +15,7 @@ extension FloatingPoint { return (self * divisor).rounded() / divisor } - /// Rounds current FloatingPoint to specified number of places + /// Rounds current FloatingPoint to specified number of places. mutating func round(toPlaces places: Int) { self = rounded(toPlaces: places) } diff --git a/Sources/PostgresConnectionPool/Extensions/IntExtensions.swift b/Sources/PostgresConnectionPool/Extensions/IntExtensions.swift index b81f627..11b3916 100644 --- a/Sources/PostgresConnectionPool/Extensions/IntExtensions.swift +++ b/Sources/PostgresConnectionPool/Extensions/IntExtensions.swift @@ -6,6 +6,7 @@ import Foundation extension Int { + /// Returns the maximum of `self` and the other value. func atLeast(_ minValue: Int) -> Int { Swift.max(minValue, self) } diff --git a/Sources/PostgresConnectionPool/Extensions/PSQLError+Description.swift b/Sources/PostgresConnectionPool/Extensions/PSQLError+Description.swift index 0007892..57f47f3 100644 --- a/Sources/PostgresConnectionPool/Extensions/PSQLError+Description.swift +++ b/Sources/PostgresConnectionPool/Extensions/PSQLError+Description.swift @@ -9,6 +9,7 @@ import PostgresNIO extension PSQLError: CustomStringConvertible { + /// A short error description. public var description: String { if let serverInfo = self.serverInfo, let severity = serverInfo[.severity], @@ -26,12 +27,14 @@ extension PSQLError: CustomStringConvertible { extension PSQLError: CustomDebugStringConvertible { + /// A detailed error description suitable for debugging queries and other problems with the server. public var debugDescription: String { var messageElements: [String] = [ "code: \(self.code)" ] if let serverInfo = self.serverInfo { + // Field -> display name let fields: OrderedDictionary = [ .severity: "severity", .message: "message", @@ -48,9 +51,9 @@ extension PSQLError: CustomDebugStringConvertible { .sqlState: "sqlState", ] - let serverInfoELements = fields.compactMap({ field -> String? in - guard let value = serverInfo[field.0] else { return nil } - return "\(field.1): \(value)" + let serverInfoELements = fields.compactMap({ fieldAndName -> String? in + guard let value = serverInfo[fieldAndName.0] else { return nil } + return "\(fieldAndName.1): \(value)" }) messageElements.append("serverInfo: [\(serverInfoELements.joined(separator: ", "))]") diff --git a/Sources/PostgresConnectionPool/Extensions/StringExtensions.swift b/Sources/PostgresConnectionPool/Extensions/StringExtensions.swift index 5d7dbfe..274bfd2 100644 --- a/Sources/PostgresConnectionPool/Extensions/StringExtensions.swift +++ b/Sources/PostgresConnectionPool/Extensions/StringExtensions.swift @@ -8,6 +8,7 @@ import Foundation extension String { + /// A Boolean value indicating whether the string matches a regular expression. func matches( _ regexp: String, caseInsensitive: Bool = false) @@ -19,6 +20,7 @@ extension String { return self.range(of: regexp, options: options) != nil } + /// Returns a new string with the matches of the regular expression replaced with some other string. func replacingPattern( _ regexp: String, with replacement: String, diff --git a/Sources/PostgresConnectionPool/Extensions/TaskExtensions.swift b/Sources/PostgresConnectionPool/Extensions/TaskExtensions.swift index 580a8b6..08b1165 100644 --- a/Sources/PostgresConnectionPool/Extensions/TaskExtensions.swift +++ b/Sources/PostgresConnectionPool/Extensions/TaskExtensions.swift @@ -6,6 +6,7 @@ import Foundation extension Task where Failure == Error { + /// Perform a task after some time. @discardableResult static func after( seconds: TimeInterval, diff --git a/Sources/PostgresConnectionPool/PoolError.swift b/Sources/PostgresConnectionPool/PoolError.swift index d527e84..7d5a02f 100644 --- a/Sources/PostgresConnectionPool/PoolError.swift +++ b/Sources/PostgresConnectionPool/PoolError.swift @@ -12,8 +12,9 @@ public enum PoolError: Error { case cancelled /// The connection to the database was unexpectedly closed. case connectionFailed - /// The pool was already shut down. - case poolDestroyed + /// The pool was already shut down. Includes the original `PSQLError` + /// if the pool was shutdown due to a permanent server error. + case poolDestroyed(PSQLError?) /// Some PostgreSQL error. case postgresError(PSQLError) /// The query was cancelled by the server. @@ -27,14 +28,31 @@ public enum PoolError: Error { extension PoolError: CustomStringConvertible { + /// A short error description. public var description: String { switch self { - case .cancelled: return "" - case .connectionFailed: return "" - case .poolDestroyed: return "" - case .postgresError(let psqlError): return "" - case .queryCancelled: return "" - case .unknown: return "" + case .cancelled: + return "" + + case .connectionFailed: + return "" + + case .poolDestroyed(let psqlError): + if let psqlError { + return "" + } + else { + return "" + } + + case .postgresError(let psqlError): + return "" + + case .queryCancelled: + return "" + + case .unknown: + return "" } } @@ -44,14 +62,31 @@ extension PoolError: CustomStringConvertible { extension PoolError: CustomDebugStringConvertible { + /// A detailed error description suitable for debugging queries and other problems with the server. public var debugDescription: String { switch self { - case .cancelled: return "" - case .connectionFailed: return "" - case .poolDestroyed: return "" - case .postgresError(let psqlError): return "" - case .queryCancelled(query: let query, runtime: let runtime): return "" - case .unknown: return "" + case .cancelled: + return "" + + case .connectionFailed: + return "" + + case .poolDestroyed(let psqlError): + if let psqlError { + return "" + } + else { + return "" + } + + case .postgresError(let psqlError): + return "" + + case .queryCancelled(query: let query, runtime: let runtime): + return "" + + case .unknown: + return "" } } diff --git a/Sources/PostgresConnectionPool/PoolInfo.swift b/Sources/PostgresConnectionPool/PoolInfo.swift index c0f8494..fdb567b 100644 --- a/Sources/PostgresConnectionPool/PoolInfo.swift +++ b/Sources/PostgresConnectionPool/PoolInfo.swift @@ -3,6 +3,7 @@ // import Foundation +import PostgresNIO /// General information about the pool and its open connections. public struct PoolInfo { @@ -37,4 +38,10 @@ public struct PoolInfo { /// Information about individual open connections to the server. public let connections: [ConnectionInfo] + + /// Whether the pool is accepting connections or was shutdown. + public let isShutdown: Bool + /// The Postgres error If the pool was shutdown forcibly. + public let shutdownError: PSQLError? + } diff --git a/Sources/PostgresConnectionPool/PostgresConnectionPool.swift b/Sources/PostgresConnectionPool/PostgresConnectionPool.swift index 5346723..e7642e0 100644 --- a/Sources/PostgresConnectionPool/PostgresConnectionPool.swift +++ b/Sources/PostgresConnectionPool/PostgresConnectionPool.swift @@ -9,7 +9,7 @@ import PostgresNIO /// A simple connection pool for PostgreSQL. public actor PostgresConnectionPool { - private static let postgresMaxNameLength: Int = 32 // PostgreSQL allows 64 but we add some extra info + private static let postgresMaxNameLength = 32 // PostgreSQL allows 64 but we add some extra info private static let healthCheckInterval: TimeInterval = 5.0 private static let idleConnectionsCheckInterval: TimeInterval = 60.0 @@ -33,8 +33,9 @@ public actor PostgresConnectionPool { private var inUseConnectionCounts: Deque = [] private var didStartWatcherTask = false - public private(set) var didShutdown = false - public private(set) var shutdownError: PoolError? + + public private(set) var isShutdown = false + public private(set) var shutdownError: PSQLError? // MARK: - @@ -106,7 +107,7 @@ public actor PostgresConnectionPool { return result } catch { - if let poolConnection = poolConnection { + if let poolConnection { await releaseConnection(poolConnection) } @@ -116,8 +117,9 @@ public actor PostgresConnectionPool { } } - func getConnection() async throws -> PoolConnection { - guard !didShutdown else { throw PoolError.poolDestroyed } + /// Adds a connection placeholder to the list of waiting connections. + private func getConnection() async throws -> PoolConnection { + guard !isShutdown else { throw PoolError.poolDestroyed(shutdownError) } return try await withCheckedThrowingContinuation({ (continuation: PostgresCheckedContinuation) in self.continuations.append(PoolContinuation(continuation: continuation)) @@ -128,8 +130,10 @@ public actor PostgresConnectionPool { }) } - func releaseConnection(_ connection: PoolConnection) async { + /// Returns a connection to the pool of open connections. + private func releaseConnection(_ connection: PoolConnection) async { if connection.state == .closed { + // Shouldn't happen if let postgresConnection = connection.connection, !postgresConnection.isClosed { @@ -137,6 +141,7 @@ public actor PostgresConnectionPool { } } else if connection.state == .connecting { + // Must not happen assertionFailure("We shouldn't have connections in this state here") } else if case .active = connection.state { @@ -157,13 +162,15 @@ public actor PostgresConnectionPool { /// Releases all resources in the pool and shuts down the event loop. /// All further uses of the pool will throw an error. /// - /// Must be done here since Swift doesn't yet allow async deinit. + /// Must be done here because + /// a) Swift doesn't yet allow async deinit, + /// b) we call this internally on connection errors and need to give the client a chance to react to errors. public func shutdown() async { - guard !didShutdown else { return } + guard !isShutdown else { return } logger.debug("[\(poolName)] shutdown()") - didShutdown = true + isShutdown = true // Cancel all waiting continuations for poolContinuation in continuations { @@ -184,9 +191,9 @@ public actor PostgresConnectionPool { try? await eventLoopGroup.shutdownGracefully() } - // Shutdown due to an unrecoverable error - private func shutdown(withError error: PoolError) async { - guard !didShutdown else { return } + /// Shutdown due to an unrecoverable error + private func shutdown(withError error: PSQLError) async { + guard !isShutdown else { return } shutdownError = error @@ -194,9 +201,9 @@ public actor PostgresConnectionPool { } /// Information about the pool and its open connections. - func poolInfo() async -> PoolInfo { + public func poolInfo() async -> PoolInfo { let connections = connections.compactMap { connection -> PoolInfo.ConnectionInfo? in - return PoolInfo.ConnectionInfo( + PoolInfo.ConnectionInfo( id: connection.id, name: nameForConnection(id: connection.id), usageCounter: connection.usageCounter, @@ -211,17 +218,20 @@ public actor PostgresConnectionPool { activeConnections: connections.count - available.count, availableConnections: available.count, usageCounter: PoolConnection.globalUsageCounter, - connections: connections) + connections: connections, + isShutdown: isShutdown, + shutdownError: shutdownError) } // MARK: - Private + /// Close the connection, call the delegate. private func closeConnection(_ poolConnection: PoolConnection) async { poolConnection.state = .closed guard let connection = poolConnection.connection else { return } - if let onCloseConnection = onCloseConnection { + if let onCloseConnection { do { try await onCloseConnection(connection, logger) } @@ -238,6 +248,7 @@ public actor PostgresConnectionPool { } } + /// Check open connections, close idle connections. private func checkConnections() async { defer { Task.after( @@ -253,8 +264,6 @@ public actor PostgresConnectionPool { || (connection.state != .connecting && connection.connection?.isClosed ?? false) }) - // TODO: Kill self if too many stuck connections - await closeIdleConnections() let usageCounter = connections.reduce(0) { $0 + $1.usageCounter } @@ -262,7 +271,7 @@ public actor PostgresConnectionPool { // Check for waiting continuations and open a new connection if possible if connections.count < poolSize, - continuations.isNotEmpty + continuations.isNotEmpty { Task.detached { [weak self] in await self?.openConnection() @@ -271,19 +280,20 @@ public actor PostgresConnectionPool { } // TODO: This doesn't work well with short bursts of activity that fall between the 5 seconds check interval + /// CLose idle connections. private func closeIdleConnections() async { guard let maxIdleConnections else { return } // 60 seconds - let minArrayLength = Int(PostgresConnectionPool.idleConnectionsCheckInterval / PostgresConnectionPool.healthCheckInterval) - assert(minArrayLength >= 1, "idleConnectionsCheckInterval must be higher than healthCheckInterval") - if inUseConnectionCounts.count > minArrayLength { + let inUseWindowSize = Int(PostgresConnectionPool.idleConnectionsCheckInterval / PostgresConnectionPool.healthCheckInterval) + assert(inUseWindowSize >= 1, "idleConnectionsCheckInterval must be higher than healthCheckInterval") + if inUseConnectionCounts.count > inUseWindowSize { inUseConnectionCounts.removeFirst() } inUseConnectionCounts.append(connections.count - available.count) guard continuations.isEmpty, - inUseConnectionCounts.count >= minArrayLength, + inUseConnectionCounts.count >= inUseWindowSize, let maxInUse = inUseConnectionCounts.max() else { return } @@ -292,13 +302,14 @@ public actor PostgresConnectionPool { logger.debug("[\(poolName)] Closing \(toClose) idle connections") - for _ in 1...toClose { + for _ in 1 ... toClose { guard let poolConnection = available.popFirst() else { break } await closeConnection(poolConnection) } } + /// Take the next waiting connection request from the queue. private func handleNextContinuation() async { guard continuations.isNotEmpty else { logger.debug("[\(poolName)] No more continuations left, \(connections.count) connections, \(available.count) available") @@ -312,6 +323,7 @@ public actor PostgresConnectionPool { // TODO: Needs some code cleanup var poolConnection: PoolConnection? + // 1. See if we have a readily available connection while let next = available.popFirst() { guard next.state == .available else { // ? -> Force close @@ -325,7 +337,9 @@ public actor PostgresConnectionPool { break } - // No available connection, try to open a new one + // 2. No available connection + // a) Can we open a new connection: do it and exit + // b) Can't open a new connection: exit, another connection will pick up the continuation guard let poolConnection else { if connections.count < poolSize { Task.detached { [weak self] in @@ -335,7 +349,8 @@ public actor PostgresConnectionPool { return } - // Check that the Postgres connection is open, or try to open a new one + // 3. Check that the Postgres connection is usable, or try to open a new one + // (see 2.) guard let connection = poolConnection.connection else { poolConnection.state = .closed @@ -347,18 +362,17 @@ public actor PostgresConnectionPool { return } - // Get the next continuation from the list... + // 4. Get the next waiting continuation from the queue and run it guard let poolContinuation = continuations.popFirst() else { available.append(poolConnection) return } - // .. and work on it poolConnection.state = .active(Date()) do { // Connection check, etc. - if let onReturnConnection = onReturnConnection { + if let onReturnConnection { try await onReturnConnection(connection, logger) } @@ -371,10 +385,12 @@ public actor PostgresConnectionPool { } } + /// Display name for a connection private func nameForConnection(id: Int) -> String { "\(connectionName) - CONN:\(id)" } + /// Open a new connection to the Postgres server private func openConnection() async { if !didStartWatcherTask { didStartWatcherTask = true @@ -422,7 +438,7 @@ public actor PostgresConnectionPool { try await connection.query(PostgresQuery(stringLiteral: "SET statement_timeout=\(Int(queryTimeout * 1000))"), logger: logger) } - if let onOpenConnection = onOpenConnection { + if let onOpenConnection { try await onOpenConnection(connection, logger) } } @@ -458,17 +474,16 @@ public actor PostgresConnectionPool { .connectionClosed, .connectionError, .connectionQuiescing, - .sslUnsupported, .failedToAddSSLHandler, .invalidCommandTag, .messageDecodingFailure, .receivedUnencryptedDataAfterSSLRequest, .saslError, - .server, + .sslUnsupported, .unexpectedBackendMessage, .unsupportedAuthMechanism: logger.error(logMessage) - await shutdown(withError: PoolError.postgresError(psqlError)) + await shutdown(withError: psqlError) return case .queryCancelled, @@ -491,7 +506,7 @@ public actor PostgresConnectionPool { .invalidName, .invalidPassword: logger.error(logMessage) - await shutdown(withError: PoolError.postgresError(psqlError)) + await shutdown(withError: psqlError) return default: diff --git a/Tests/PostgresConnectionPoolTests/ConnectionErrorTests.swift b/Tests/PostgresConnectionPoolTests/ConnectionErrorTests.swift index fa7c3ac..cfca4a7 100644 --- a/Tests/PostgresConnectionPoolTests/ConnectionErrorTests.swift +++ b/Tests/PostgresConnectionPoolTests/ConnectionErrorTests.swift @@ -14,6 +14,30 @@ final class ConnectionErrorTests: XCTestCase { return logger }() + // MARK: - + + // Test that the pool can actually connect to the server. + + func testCanConnect() async throws { + let configuration = poolConfiguration() + let pool = PostgresConnectionPool(configuration: configuration, logger: logger) + + do { + try await pool.connection { connection in + try await connection.query("SELECT 1", logger: logger) + } + await pool.shutdown() + } + catch { + XCTFail("Is the cocker container running? (\(String(describing: (error as? PoolError)?.debugDescription))") + } + + let didShutdown = await pool.isShutdown + XCTAssertTrue(didShutdown) + } + + // MARK: - + // TODO: Clean up the error checking // TODO: Check that the Docker PostgreSQL server is actually up and available first or most tests will fail anyway @@ -29,7 +53,7 @@ final class ConnectionErrorTests: XCTestCase { } await pool.shutdown() - XCTFail("Can't connect, so we should have an exception") + XCTFail("Can't connect, so we should have an exception here") } catch { XCTAssertTrue(error is PoolError) @@ -37,24 +61,25 @@ final class ConnectionErrorTests: XCTestCase { let shutdownError = await pool.shutdownError XCTAssertEqual(shutdownError?.description, expectedErrorDescription) } - let didShutdown = await pool.didShutdown + + let didShutdown = await pool.isShutdown XCTAssertTrue(didShutdown) } func testConnectWrongHost() async throws { - try await withConfiguration(self.poolConfiguration(host: "notworking"), expectedErrorDescription: ">") + try await withConfiguration(self.poolConfiguration(host: "notworking"), expectedErrorDescription: "") } func testConnectWrongPort() async throws { - try await withConfiguration(self.poolConfiguration(port: 99999), expectedErrorDescription: ">") + try await withConfiguration(self.poolConfiguration(port: 99999), expectedErrorDescription: "") } func testConnectWrongUsername() async throws { - try await withConfiguration(self.poolConfiguration(username: "notworking"), expectedErrorDescription: ">") + try await withConfiguration(self.poolConfiguration(username: "notworking"), expectedErrorDescription: "") } func testConnectWrongPassword() async throws { - try await withConfiguration(self.poolConfiguration(password: "notworking"), expectedErrorDescription: ">") + try await withConfiguration(self.poolConfiguration(password: "notworking"), expectedErrorDescription: "") } func testConnectInvalidTLSConfig() async throws { @@ -62,7 +87,7 @@ final class ConnectionErrorTests: XCTestCase { tlsConfiguration.maximumTLSVersion = .tlsv1 // New Postgres versions want at least TLSv1.2 let tls: PostgresConnection.Configuration.TLS = .require(try .init(configuration: tlsConfiguration)) - try await withConfiguration(self.poolConfiguration(tls: tls), expectedErrorDescription: ">") + try await withConfiguration(self.poolConfiguration(tls: tls), expectedErrorDescription: "") } // MARK: - From 2be276f391f4aafd1134805abf3b52c643235a95 Mon Sep 17 00:00:00 2001 From: Thomas Rasch Date: Thu, 11 May 2023 13:09:55 +0200 Subject: [PATCH 2/2] Added more tests --- Sources/PostgresConnectionPool/PoolInfo.swift | 46 +++++++ .../PostgresConnectionPool.swift | 25 +++- .../ConnectionErrorTests.swift | 95 ++++--------- .../ConnectionTests.swift | 127 ++++++++++++++++++ .../PostgresHelpers.swift | 41 ++++++ 5 files changed, 259 insertions(+), 75 deletions(-) create mode 100644 Tests/PostgresConnectionPoolTests/ConnectionTests.swift create mode 100644 Tests/PostgresConnectionPoolTests/PostgresHelpers.swift diff --git a/Sources/PostgresConnectionPool/PoolInfo.swift b/Sources/PostgresConnectionPool/PoolInfo.swift index fdb567b..4846747 100644 --- a/Sources/PostgresConnectionPool/PoolInfo.swift +++ b/Sources/PostgresConnectionPool/PoolInfo.swift @@ -45,3 +45,49 @@ public struct PoolInfo { public let shutdownError: PSQLError? } + +// MARK: - CustomStringConvertible + +extension PoolInfo: CustomStringConvertible { + + public var description: String { + var lines: [String] = [ + "Pool: \(name)", + "Connections: \(openConnections)/\(activeConnections)/\(availableConnections) (open/active/available)", + "Usage: \(usageCounter)", + "Shutdown? \(isShutdown) \(shutdownError != nil ? "(\(shutdownError!.description))" : "")", + ] + + if connections.isNotEmpty { + lines.append("Connections:") + + for connection in connections.sorted(by: { $0.id < $1.id }) { + lines.append(contentsOf: connection.description.components(separatedBy: "\n").map({ " " + $0 })) + } + } + + return lines.joined(separator: "\n") + } + +} + +extension PoolInfo.ConnectionInfo: CustomStringConvertible { + + public var description: String { + var lines: [String] = [ + "Connection: \(id) (\(name))", + " State: \(state)", + " Usage: \(usageCounter)", + ] + + if let query { + lines.append(" Query: \(query)") + if let queryRuntime { + lines.append(" Runtime: \(queryRuntime.rounded(toPlaces: 3))s") + } + } + + return lines.joined(separator: "\n") + } + +} diff --git a/Sources/PostgresConnectionPool/PostgresConnectionPool.swift b/Sources/PostgresConnectionPool/PostgresConnectionPool.swift index e7642e0..fda1420 100644 --- a/Sources/PostgresConnectionPool/PostgresConnectionPool.swift +++ b/Sources/PostgresConnectionPool/PostgresConnectionPool.swift @@ -200,6 +200,25 @@ public actor PostgresConnectionPool { await shutdown() } + /// Forcibly close all idle connections. + public func closeIdleConnections() async { + guard !isShutdown else { return } + + let availableCopy = available + available.removeAll() + + logger.debug("[\(poolName)] Closing \(availableCopy.count) idle connections") + + for poolConnection in availableCopy { + await closeConnection(poolConnection) + } + + connections.removeAll(where: { connection in + connection.state == .closed + || (connection.state != .connecting && connection.connection?.isClosed ?? false) + }) + } + /// Information about the pool and its open connections. public func poolInfo() async -> PoolInfo { let connections = connections.compactMap { connection -> PoolInfo.ConnectionInfo? in @@ -259,13 +278,13 @@ public actor PostgresConnectionPool { }) } + await checkIdleConnections() + connections.removeAll(where: { connection in connection.state == .closed || (connection.state != .connecting && connection.connection?.isClosed ?? false) }) - await closeIdleConnections() - let usageCounter = connections.reduce(0) { $0 + $1.usageCounter } logger.debug("[\(poolName)] \(connections.count) connections (\(available.count) available, \(usageCounter) queries), \(continuations.count) continuations left") @@ -281,7 +300,7 @@ public actor PostgresConnectionPool { // TODO: This doesn't work well with short bursts of activity that fall between the 5 seconds check interval /// CLose idle connections. - private func closeIdleConnections() async { + private func checkIdleConnections() async { guard let maxIdleConnections else { return } // 60 seconds diff --git a/Tests/PostgresConnectionPoolTests/ConnectionErrorTests.swift b/Tests/PostgresConnectionPoolTests/ConnectionErrorTests.swift index cfca4a7..9b64cd6 100644 --- a/Tests/PostgresConnectionPoolTests/ConnectionErrorTests.swift +++ b/Tests/PostgresConnectionPoolTests/ConnectionErrorTests.swift @@ -16,30 +16,34 @@ final class ConnectionErrorTests: XCTestCase { // MARK: - - // Test that the pool can actually connect to the server. + // TODO: Clean up the error checking + // TODO: Check that the Docker PostgreSQL server is actually up and available first or most tests will fail anyway - func testCanConnect() async throws { - let configuration = poolConfiguration() - let pool = PostgresConnectionPool(configuration: configuration, logger: logger) + func testConnectWrongHost() async throws { + try await withConfiguration(PostgresHelpers.poolConfiguration(host: "notworking"), expectedErrorDescription: "") + } - do { - try await pool.connection { connection in - try await connection.query("SELECT 1", logger: logger) - } - await pool.shutdown() - } - catch { - XCTFail("Is the cocker container running? (\(String(describing: (error as? PoolError)?.debugDescription))") - } + func testConnectWrongPort() async throws { + try await withConfiguration(PostgresHelpers.poolConfiguration(port: 99999), expectedErrorDescription: "") + } - let didShutdown = await pool.isShutdown - XCTAssertTrue(didShutdown) + func testConnectWrongUsername() async throws { + try await withConfiguration(PostgresHelpers.poolConfiguration(username: "notworking"), expectedErrorDescription: "") } - // MARK: - + func testConnectWrongPassword() async throws { + try await withConfiguration(PostgresHelpers.poolConfiguration(password: "notworking"), expectedErrorDescription: "") + } - // TODO: Clean up the error checking - // TODO: Check that the Docker PostgreSQL server is actually up and available first or most tests will fail anyway + func testConnectInvalidTLSConfig() async throws { + var tlsConfiguration: TLSConfiguration = .clientDefault + tlsConfiguration.maximumTLSVersion = .tlsv1 // New Postgres versions want at least TLSv1.2 + + let tls: PostgresConnection.Configuration.TLS = .require(try .init(configuration: tlsConfiguration)) + try await withConfiguration(PostgresHelpers.poolConfiguration(tls: tls), expectedErrorDescription: "") + } + + // MARK: - private func withConfiguration( _ configuration: PoolConfiguration, @@ -47,6 +51,7 @@ final class ConnectionErrorTests: XCTestCase { async throws { let pool = PostgresConnectionPool(configuration: configuration, logger: logger) + do { try await pool.connection { connection in try await connection.query("SELECT 1", logger: logger) @@ -66,58 +71,4 @@ final class ConnectionErrorTests: XCTestCase { XCTAssertTrue(didShutdown) } - func testConnectWrongHost() async throws { - try await withConfiguration(self.poolConfiguration(host: "notworking"), expectedErrorDescription: "") - } - - func testConnectWrongPort() async throws { - try await withConfiguration(self.poolConfiguration(port: 99999), expectedErrorDescription: "") - } - - func testConnectWrongUsername() async throws { - try await withConfiguration(self.poolConfiguration(username: "notworking"), expectedErrorDescription: "") - } - - func testConnectWrongPassword() async throws { - try await withConfiguration(self.poolConfiguration(password: "notworking"), expectedErrorDescription: "") - } - - func testConnectInvalidTLSConfig() async throws { - var tlsConfiguration: TLSConfiguration = .clientDefault - tlsConfiguration.maximumTLSVersion = .tlsv1 // New Postgres versions want at least TLSv1.2 - - let tls: PostgresConnection.Configuration.TLS = .require(try .init(configuration: tlsConfiguration)) - try await withConfiguration(self.poolConfiguration(tls: tls), expectedErrorDescription: "") - } - - // MARK: - - - private func poolConfiguration( - host: String? = nil, - port: Int? = nil, - username: String? = nil, - password: String? = nil, - database: String? = nil, - tls: PostgresConnection.Configuration.TLS = .disable) - -> PoolConfiguration - { - let postgresConfiguration = PostgresConnection.Configuration( - host: host ?? env("POSTGRES_HOSTNAME") ?? "localhost", - port: port ?? env("POSTGRES_PORT").flatMap(Int.init(_:)) ?? 5432, - username: username ?? env("POSTGRES_USER") ?? "test_username", - password: password ?? env("POSTGRES_PASSWORD") ?? "test_password", - database: database ?? env("POSTGRES_DB") ?? "test_database", - tls: tls) - return PoolConfiguration( - applicationName: "ConnectionErrorTests", - postgresConfiguration: postgresConfiguration, - connectTimeout: 10.0, - queryTimeout: 10.0, - poolSize: 5, - maxIdleConnections: 1) - } - - private func env(_ name: String) -> String? { - getenv(name).flatMap { String(cString: $0) } - } } diff --git a/Tests/PostgresConnectionPoolTests/ConnectionTests.swift b/Tests/PostgresConnectionPoolTests/ConnectionTests.swift new file mode 100644 index 0000000..ef9e005 --- /dev/null +++ b/Tests/PostgresConnectionPoolTests/ConnectionTests.swift @@ -0,0 +1,127 @@ +// +// Created by Thomas Rasch on 11.05.23. +// + +@testable import PostgresConnectionPool +import PostgresNIO +import XCTest + +final class ConnectionTests: XCTestCase { + + private var logger: Logger = { + var logger = Logger(label: "ConnectionTests") + logger.logLevel = .info + return logger + }() + + // MARK: - + + // Test that the pool can actually connect to the server. + func testCanConnect() async throws { + let pool = PostgresConnectionPool(configuration: PostgresHelpers.poolConfiguration(), logger: logger) + + do { + try await pool.connection { connection in + try await connection.query("SELECT 1", logger: logger) + } + await pool.shutdown() + } + catch { + XCTFail("Is the cocker container running? (\(String(describing: (error as? PoolError)?.debugDescription))") + } + + let didShutdown = await pool.isShutdown + XCTAssertTrue(didShutdown) + } + + func testPoolInfo() async throws { + let initialUsageCounter = PoolConnection.globalUsageCounter + let pool = PostgresConnectionPool(configuration: PostgresHelpers.poolConfiguration(), logger: logger) + + let poolInfoBefore = await pool.poolInfo() + print(poolInfoBefore) + XCTAssertEqual(poolInfoBefore.activeConnections, 0) + XCTAssertEqual(poolInfoBefore.availableConnections, 0) + XCTAssertEqual(poolInfoBefore.openConnections, 0) + XCTAssertEqual(poolInfoBefore.usageCounter, initialUsageCounter) + XCTAssertEqual(poolInfoBefore.connections.count, poolInfoBefore.openConnections) + XCTAssertFalse(poolInfoBefore.isShutdown) + XCTAssertNil(poolInfoBefore.shutdownError) + + let start = 1 + let end = 1000 + + await withThrowingTaskGroup(of: Void.self) { taskGroup in + for _ in 1 ... 1000 { + taskGroup.addTask { + try await pool.connection { connection in + _ = try await connection.query("SELECT generate_series(\(start), \(end));", logger: self.logger) + } + } + } + } + + let poolInfo = await pool.poolInfo() + print(poolInfo) + XCTAssertEqual(poolInfo.activeConnections, 0) + XCTAssertGreaterThan(poolInfo.availableConnections, 0) + XCTAssertGreaterThan(poolInfo.openConnections, 0) + XCTAssertEqual(poolInfo.usageCounter, 1000 + initialUsageCounter) + XCTAssertEqual(poolInfo.connections.count, poolInfo.openConnections) + XCTAssertFalse(poolInfo.isShutdown) + XCTAssertNil(poolInfo.shutdownError) + + await pool.shutdown() + + let poolInfoAfterShutdown = await pool.poolInfo() + print(poolInfoAfterShutdown) + XCTAssertEqual(poolInfoAfterShutdown.activeConnections, 0) + XCTAssertEqual(poolInfoAfterShutdown.availableConnections, 0) + XCTAssertEqual(poolInfoAfterShutdown.openConnections, 0) + XCTAssertGreaterThan(poolInfoAfterShutdown.usageCounter, 0) + XCTAssertEqual(poolInfoAfterShutdown.connections.count, 0) + XCTAssertTrue(poolInfoAfterShutdown.isShutdown) + XCTAssertNil(poolInfoAfterShutdown.shutdownError) + } + + func testPoolSize100() async throws { + let initialUsageCounter = PoolConnection.globalUsageCounter + let pool = PostgresConnectionPool(configuration: PostgresHelpers.poolConfiguration(poolSize: 100), logger: logger) + + let start = 1 + let end = 100 + + await withThrowingTaskGroup(of: Void.self) { taskGroup in + for _ in 1 ... 10000 { + taskGroup.addTask { + try await pool.connection { connection in + _ = try await connection.query("SELECT generate_series(\(start), \(end));", logger: self.logger) + } + } + } + } + + let poolInfo = await pool.poolInfo() + XCTAssertEqual(poolInfo.activeConnections, 0) + XCTAssertGreaterThan(poolInfo.availableConnections, 0) + XCTAssertGreaterThan(poolInfo.openConnections, 0) + XCTAssertEqual(poolInfo.usageCounter, 10000 + initialUsageCounter) + XCTAssertEqual(poolInfo.connections.count, poolInfo.openConnections) + XCTAssertFalse(poolInfo.isShutdown) + XCTAssertNil(poolInfo.shutdownError) + + await pool.closeIdleConnections() + + let poolInfoIdleClosed = await pool.poolInfo() + XCTAssertEqual(poolInfoIdleClosed.activeConnections, 0) + XCTAssertEqual(poolInfoIdleClosed.availableConnections, 0) + XCTAssertEqual(poolInfoIdleClosed.openConnections, 0) + XCTAssertEqual(poolInfoIdleClosed.usageCounter, 10000 + initialUsageCounter) + XCTAssertEqual(poolInfoIdleClosed.connections.count, poolInfoIdleClosed.openConnections) + XCTAssertFalse(poolInfoIdleClosed.isShutdown) + XCTAssertNil(poolInfoIdleClosed.shutdownError) + + await pool.shutdown() + } + +} diff --git a/Tests/PostgresConnectionPoolTests/PostgresHelpers.swift b/Tests/PostgresConnectionPoolTests/PostgresHelpers.swift new file mode 100644 index 0000000..7df8bcf --- /dev/null +++ b/Tests/PostgresConnectionPoolTests/PostgresHelpers.swift @@ -0,0 +1,41 @@ +// +// Created by Thomas Rasch on 11.05.23. +// + +import Foundation +import PostgresConnectionPool +import PostgresNIO + +enum PostgresHelpers { + + static func poolConfiguration( + host: String? = nil, + port: Int? = nil, + username: String? = nil, + password: String? = nil, + database: String? = nil, + tls: PostgresConnection.Configuration.TLS = .disable, + poolSize: Int = 5) + -> PoolConfiguration + { + let postgresConfiguration = PostgresConnection.Configuration( + host: host ?? env("POSTGRES_HOSTNAME") ?? "localhost", + port: port ?? env("POSTGRES_PORT").flatMap(Int.init(_:)) ?? 5432, + username: username ?? env("POSTGRES_USER") ?? "test_username", + password: password ?? env("POSTGRES_PASSWORD") ?? "test_password", + database: database ?? env("POSTGRES_DB") ?? "test_database", + tls: tls) + return PoolConfiguration( + applicationName: "PoolTests", + postgresConfiguration: postgresConfiguration, + connectTimeout: 10.0, + queryTimeout: 10.0, + poolSize: poolSize, + maxIdleConnections: 1) + } + + private static func env(_ name: String) -> String? { + getenv(name).flatMap { String(cString: $0) } + } + +}