From 7af7a75c7d156e7ee61d0949d96712a783cb56dd Mon Sep 17 00:00:00 2001 From: Thomas Rasch Date: Fri, 12 May 2023 13:11:17 +0200 Subject: [PATCH 1/2] #9: Add a batchId to connections --- README.md | 6 +-- .../PoolConnection.swift | 2 + .../PoolContinuation.swift | 4 +- Sources/PostgresConnectionPool/PoolInfo.swift | 6 +++ .../PostgresConnectionPool.swift | 50 ++++++++++++++++--- 5 files changed, 58 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 81bd492..b90cd22 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ This package requires Swift 5.7 or higher (at least Xcode 13), and compiles on m ```swift dependencies: [ - .package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.6.1"), + .package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.7.0"), ], targets: [ .target(name: "MyTarget", dependencies: [ @@ -51,9 +51,9 @@ let configuration = PoolConfiguration( let pool = PostgresConnectionPool(configuration: configuration, logger: logger) // Fetch a connection from the pool and do something with it... -try await pool.connection(callback: { connection in +try await pool.connection { connection in try await connection.query(PostgresQuery(stringLiteral: "SELECT 1"), logger: logger) -}) +} // With PostgresKit func fetchObjects(_ sql: SQLQueryString) async throws -> [T] { diff --git a/Sources/PostgresConnectionPool/PoolConnection.swift b/Sources/PostgresConnectionPool/PoolConnection.swift index 62d2983..effff4d 100644 --- a/Sources/PostgresConnectionPool/PoolConnection.swift +++ b/Sources/PostgresConnectionPool/PoolConnection.swift @@ -14,6 +14,8 @@ final class PoolConnection: Identifiable, Equatable { private(set) var usageCounter = 0 let id: Int + var batchId: Int? + var connection: PostgresConnection? var state: PoolConnectionState = .connecting { didSet { diff --git a/Sources/PostgresConnectionPool/PoolContinuation.swift b/Sources/PostgresConnectionPool/PoolContinuation.swift index ea07a96..b30da05 100644 --- a/Sources/PostgresConnectionPool/PoolContinuation.swift +++ b/Sources/PostgresConnectionPool/PoolContinuation.swift @@ -10,10 +10,12 @@ typealias PostgresCheckedContinuation = CheckedContinuation( + batchId: Int? = nil, _ callback: (PostgresConnectionWrapper) async throws -> T) async throws -> T { var poolConnection: PoolConnection? do { - poolConnection = try await getConnection() + poolConnection = try await getConnection(batchId: batchId) if Task.isCancelled { await releaseConnection(poolConnection!) + + if let batchId { + await abortBatch(batchId) + } + throw PoolError.cancelled } - let result = try await PostgresConnectionWrapper.distribute(poolConnection: poolConnection, callback: callback) + let result = try await PostgresConnectionWrapper.distribute( + poolConnection: poolConnection, + callback: callback) await releaseConnection(poolConnection!) @@ -118,11 +130,11 @@ public actor PostgresConnectionPool { } /// Adds a connection placeholder to the list of waiting connections. - private func getConnection() async throws -> PoolConnection { + private func getConnection(batchId: Int? = nil) async throws -> PoolConnection { guard !isShutdown else { throw PoolError.poolDestroyed(shutdownError) } return try await withCheckedThrowingContinuation({ (continuation: PostgresCheckedContinuation) in - self.continuations.append(PoolContinuation(continuation: continuation)) + self.continuations.append(PoolContinuation(batchId: batchId, continuation: continuation)) Task.detached { [weak self] in await self?.handleNextContinuation() @@ -154,11 +166,31 @@ public actor PostgresConnectionPool { assert(available.contains(connection), "Connections in state 'available' should be available") } + connection.query = nil + connection.batchId = nil + Task.detached { [weak self] in await self?.handleNextContinuation() } } + /// Aborts all waiting queries with the given `batchId`. + public func abortBatch(_ batchId: Int) async { + let countBefore = continuations.count + + continuations.removeAll(where: { poolContinuation in + guard poolContinuation.batchId == batchId else { return false } + + poolContinuation.continuation.resume(throwing: PoolError.cancelled) + return true + }) + + let countRemoved = countBefore - continuations.count + if countRemoved > 0 { + logger.debug("[\(poolName)] Removed \(countRemoved) continuations for batch \(batchId)") + } + } + /// Releases all resources in the pool and shuts down the event loop. /// All further uses of the pool will throw an error. /// @@ -220,12 +252,17 @@ public actor PostgresConnectionPool { } /// Information about the pool and its open connections. - public func poolInfo() async -> PoolInfo { + public func poolInfo(batchId: Int? = nil) async -> PoolInfo { let connections = connections.compactMap { connection -> PoolInfo.ConnectionInfo? in - PoolInfo.ConnectionInfo( + if let batchId, connection.batchId != batchId { + return nil + } + + return PoolInfo.ConnectionInfo( id: connection.id, name: nameForConnection(id: connection.id), usageCounter: connection.usageCounter, + batchId: connection.batchId, query: connection.query, queryRuntime: connection.queryRuntime, state: connection.state) @@ -388,6 +425,7 @@ public actor PostgresConnectionPool { } poolConnection.state = .active(Date()) + poolConnection.batchId = poolContinuation.batchId do { // Connection check, etc. From cbe2c1ce86be8fe3e4bde0322748d3f9f6352a41 Mon Sep 17 00:00:00 2001 From: Thomas Rasch Date: Fri, 12 May 2023 14:51:25 +0200 Subject: [PATCH 2/2] Remove the global usage counter as it is misleading --- Sources/PostgresConnectionPool/PoolConnection.swift | 2 -- Sources/PostgresConnectionPool/PoolInfo.swift | 11 ++++------- .../PostgresConnectionPool.swift | 1 - .../PostgresConnectionPoolTests/ConnectionTests.swift | 7 ------- 4 files changed, 4 insertions(+), 17 deletions(-) diff --git a/Sources/PostgresConnectionPool/PoolConnection.swift b/Sources/PostgresConnectionPool/PoolConnection.swift index effff4d..f6c2be9 100644 --- a/Sources/PostgresConnectionPool/PoolConnection.swift +++ b/Sources/PostgresConnectionPool/PoolConnection.swift @@ -9,7 +9,6 @@ final class PoolConnection: Identifiable, Equatable { // TODO: Serialize access private(set) static var connectionId = 0 - private(set) static var globalUsageCounter = 0 private(set) var usageCounter = 0 @@ -21,7 +20,6 @@ final class PoolConnection: Identifiable, Equatable { didSet { if case .active = state { usageCounter += 1 - PoolConnection.globalUsageCounter += 1 } } } diff --git a/Sources/PostgresConnectionPool/PoolInfo.swift b/Sources/PostgresConnectionPool/PoolInfo.swift index b241078..53d4a46 100644 --- a/Sources/PostgresConnectionPool/PoolInfo.swift +++ b/Sources/PostgresConnectionPool/PoolInfo.swift @@ -34,8 +34,6 @@ public struct PoolInfo { public let activeConnections: Int /// The number of connections that are currently available. public let availableConnections: Int - /// The total number of queries that were sent to the server. - public let usageCounter: Int /// Information about individual open connections to the server. public let connections: [ConnectionInfo] @@ -55,16 +53,15 @@ extension PoolInfo: CustomStringConvertible { public var description: String { var lines: [String] = [ "Pool: \(name)", - "Connections: \(openConnections)/\(activeConnections)/\(availableConnections) (open/active/available)", - "Usage: \(usageCounter)", - "Shutdown? \(isShutdown) \(shutdownError != nil ? "(\(shutdownError!.description))" : "")", + " Connections: \(openConnections)/\(activeConnections)/\(availableConnections) (open/active/available)", + " Is shut down? \(isShutdown) \(shutdownError != nil ? "(\(shutdownError!.description))" : "")", ] if connections.isNotEmpty { - lines.append("Connections:") + lines.append(" Connections:") for connection in connections.sorted(by: { $0.id < $1.id }) { - lines.append(contentsOf: connection.description.components(separatedBy: "\n").map({ " " + $0 })) + lines.append(contentsOf: connection.description.components(separatedBy: "\n").map({ " " + $0 })) } } diff --git a/Sources/PostgresConnectionPool/PostgresConnectionPool.swift b/Sources/PostgresConnectionPool/PostgresConnectionPool.swift index 4cc16a0..3e72b0f 100644 --- a/Sources/PostgresConnectionPool/PostgresConnectionPool.swift +++ b/Sources/PostgresConnectionPool/PostgresConnectionPool.swift @@ -273,7 +273,6 @@ public actor PostgresConnectionPool { openConnections: connections.count, activeConnections: connections.count - available.count, availableConnections: available.count, - usageCounter: PoolConnection.globalUsageCounter, connections: connections, isShutdown: isShutdown, shutdownError: shutdownError) diff --git a/Tests/PostgresConnectionPoolTests/ConnectionTests.swift b/Tests/PostgresConnectionPoolTests/ConnectionTests.swift index ef9e005..f64bbb6 100644 --- a/Tests/PostgresConnectionPoolTests/ConnectionTests.swift +++ b/Tests/PostgresConnectionPoolTests/ConnectionTests.swift @@ -35,7 +35,6 @@ final class ConnectionTests: XCTestCase { } func testPoolInfo() async throws { - let initialUsageCounter = PoolConnection.globalUsageCounter let pool = PostgresConnectionPool(configuration: PostgresHelpers.poolConfiguration(), logger: logger) let poolInfoBefore = await pool.poolInfo() @@ -43,7 +42,6 @@ final class ConnectionTests: XCTestCase { XCTAssertEqual(poolInfoBefore.activeConnections, 0) XCTAssertEqual(poolInfoBefore.availableConnections, 0) XCTAssertEqual(poolInfoBefore.openConnections, 0) - XCTAssertEqual(poolInfoBefore.usageCounter, initialUsageCounter) XCTAssertEqual(poolInfoBefore.connections.count, poolInfoBefore.openConnections) XCTAssertFalse(poolInfoBefore.isShutdown) XCTAssertNil(poolInfoBefore.shutdownError) @@ -66,7 +64,6 @@ final class ConnectionTests: XCTestCase { XCTAssertEqual(poolInfo.activeConnections, 0) XCTAssertGreaterThan(poolInfo.availableConnections, 0) XCTAssertGreaterThan(poolInfo.openConnections, 0) - XCTAssertEqual(poolInfo.usageCounter, 1000 + initialUsageCounter) XCTAssertEqual(poolInfo.connections.count, poolInfo.openConnections) XCTAssertFalse(poolInfo.isShutdown) XCTAssertNil(poolInfo.shutdownError) @@ -78,14 +75,12 @@ final class ConnectionTests: XCTestCase { XCTAssertEqual(poolInfoAfterShutdown.activeConnections, 0) XCTAssertEqual(poolInfoAfterShutdown.availableConnections, 0) XCTAssertEqual(poolInfoAfterShutdown.openConnections, 0) - XCTAssertGreaterThan(poolInfoAfterShutdown.usageCounter, 0) XCTAssertEqual(poolInfoAfterShutdown.connections.count, 0) XCTAssertTrue(poolInfoAfterShutdown.isShutdown) XCTAssertNil(poolInfoAfterShutdown.shutdownError) } func testPoolSize100() async throws { - let initialUsageCounter = PoolConnection.globalUsageCounter let pool = PostgresConnectionPool(configuration: PostgresHelpers.poolConfiguration(poolSize: 100), logger: logger) let start = 1 @@ -105,7 +100,6 @@ final class ConnectionTests: XCTestCase { XCTAssertEqual(poolInfo.activeConnections, 0) XCTAssertGreaterThan(poolInfo.availableConnections, 0) XCTAssertGreaterThan(poolInfo.openConnections, 0) - XCTAssertEqual(poolInfo.usageCounter, 10000 + initialUsageCounter) XCTAssertEqual(poolInfo.connections.count, poolInfo.openConnections) XCTAssertFalse(poolInfo.isShutdown) XCTAssertNil(poolInfo.shutdownError) @@ -116,7 +110,6 @@ final class ConnectionTests: XCTestCase { XCTAssertEqual(poolInfoIdleClosed.activeConnections, 0) XCTAssertEqual(poolInfoIdleClosed.availableConnections, 0) XCTAssertEqual(poolInfoIdleClosed.openConnections, 0) - XCTAssertEqual(poolInfoIdleClosed.usageCounter, 10000 + initialUsageCounter) XCTAssertEqual(poolInfoIdleClosed.connections.count, poolInfoIdleClosed.openConnections) XCTAssertFalse(poolInfoIdleClosed.isShutdown) XCTAssertNil(poolInfoIdleClosed.shutdownError)