Skip to content

Commit

Permalink
Merge pull request #10 from Outdooractive/9_batch_id
Browse files Browse the repository at this point in the history
#9: Add a batchId to connections
  • Loading branch information
trasch authored May 12, 2023
2 parents ccec1c2 + cbe2c1c commit e8c65e5
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 27 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ This package requires Swift 5.7 or higher (at least Xcode 13), and compiles on m

```swift
dependencies: [
.package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.6.1"),
.package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.7.0"),
],
targets: [
.target(name: "MyTarget", dependencies: [
Expand Down Expand Up @@ -51,9 +51,9 @@ let configuration = PoolConfiguration(
let pool = PostgresConnectionPool(configuration: configuration, logger: logger)

// Fetch a connection from the pool and do something with it...
try await pool.connection(callback: { connection in
try await pool.connection { connection in
try await connection.query(PostgresQuery(stringLiteral: "SELECT 1"), logger: logger)
})
}

// With PostgresKit
func fetchObjects<T: Decodable>(_ sql: SQLQueryString) async throws -> [T] {
Expand Down
4 changes: 2 additions & 2 deletions Sources/PostgresConnectionPool/PoolConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ final class PoolConnection: Identifiable, Equatable {

// TODO: Serialize access
private(set) static var connectionId = 0
private(set) static var globalUsageCounter = 0

private(set) var usageCounter = 0

let id: Int
var batchId: Int?

var connection: PostgresConnection?
var state: PoolConnectionState = .connecting {
didSet {
if case .active = state {
usageCounter += 1
PoolConnection.globalUsageCounter += 1
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion Sources/PostgresConnectionPool/PoolContinuation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ typealias PostgresCheckedContinuation = CheckedContinuation<PoolConnection, Erro
final class PoolContinuation {

let added: Date
let batchId: Int?
let continuation: PostgresCheckedContinuation

init(continuation: PostgresCheckedContinuation) {
init(batchId: Int?, continuation: PostgresCheckedContinuation) {
self.added = Date()
self.batchId = batchId
self.continuation = continuation
}

Expand Down
17 changes: 10 additions & 7 deletions Sources/PostgresConnectionPool/PoolInfo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public struct PoolInfo {
public let name: String
/// Total number of queries that were sent over this connection.
public let usageCounter: Int
/// The connection's batch Id.
public var batchId: Int?
/// The current query, if available.
public let query: String?
/// The current time for the query, if available.
Expand All @@ -32,8 +34,6 @@ public struct PoolInfo {
public let activeConnections: Int
/// The number of connections that are currently available.
public let availableConnections: Int
/// The total number of queries that were sent to the server.
public let usageCounter: Int

/// Information about individual open connections to the server.
public let connections: [ConnectionInfo]
Expand All @@ -53,16 +53,15 @@ extension PoolInfo: CustomStringConvertible {
public var description: String {
var lines: [String] = [
"Pool: \(name)",
"Connections: \(openConnections)/\(activeConnections)/\(availableConnections) (open/active/available)",
"Usage: \(usageCounter)",
"Shutdown? \(isShutdown) \(shutdownError != nil ? "(\(shutdownError!.description))" : "")",
" Connections: \(openConnections)/\(activeConnections)/\(availableConnections) (open/active/available)",
" Is shut down? \(isShutdown) \(shutdownError != nil ? "(\(shutdownError!.description))" : "")",
]

if connections.isNotEmpty {
lines.append("Connections:")
lines.append(" Connections:")

for connection in connections.sorted(by: { $0.id < $1.id }) {
lines.append(contentsOf: connection.description.components(separatedBy: "\n").map({ " " + $0 }))
lines.append(contentsOf: connection.description.components(separatedBy: "\n").map({ " " + $0 }))
}
}

Expand All @@ -87,6 +86,10 @@ extension PoolInfo.ConnectionInfo: CustomStringConvertible {
}
}

if let batchId {
lines.append(" BatchId: \(batchId)")
}

return lines.joined(separator: "\n")
}

Expand Down
51 changes: 44 additions & 7 deletions Sources/PostgresConnectionPool/PostgresConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,34 @@ public actor PostgresConnectionPool {
// }

/// Takes one connection from the pool and dishes it out to the caller.
///
/// - Parameters:
/// - batchId: An optional integer value associated with the connection. See also ``abortBatch(_:)``.
/// - callback: A closure with a connection to the database server.
@discardableResult
public func connection<T>(
batchId: Int? = nil,
_ callback: (PostgresConnectionWrapper) async throws -> T)
async throws -> T
{
var poolConnection: PoolConnection?

do {
poolConnection = try await getConnection()
poolConnection = try await getConnection(batchId: batchId)

if Task.isCancelled {
await releaseConnection(poolConnection!)

if let batchId {
await abortBatch(batchId)
}

throw PoolError.cancelled
}

let result = try await PostgresConnectionWrapper.distribute(poolConnection: poolConnection, callback: callback)
let result = try await PostgresConnectionWrapper.distribute(
poolConnection: poolConnection,
callback: callback)

await releaseConnection(poolConnection!)

Expand All @@ -118,11 +130,11 @@ public actor PostgresConnectionPool {
}

/// Adds a connection placeholder to the list of waiting connections.
private func getConnection() async throws -> PoolConnection {
private func getConnection(batchId: Int? = nil) async throws -> PoolConnection {
guard !isShutdown else { throw PoolError.poolDestroyed(shutdownError) }

return try await withCheckedThrowingContinuation({ (continuation: PostgresCheckedContinuation) in
self.continuations.append(PoolContinuation(continuation: continuation))
self.continuations.append(PoolContinuation(batchId: batchId, continuation: continuation))

Task.detached { [weak self] in
await self?.handleNextContinuation()
Expand Down Expand Up @@ -154,11 +166,31 @@ public actor PostgresConnectionPool {
assert(available.contains(connection), "Connections in state 'available' should be available")
}

connection.query = nil
connection.batchId = nil

Task.detached { [weak self] in
await self?.handleNextContinuation()
}
}

/// Aborts all waiting queries with the given `batchId`.
public func abortBatch(_ batchId: Int) async {
let countBefore = continuations.count

continuations.removeAll(where: { poolContinuation in
guard poolContinuation.batchId == batchId else { return false }

poolContinuation.continuation.resume(throwing: PoolError.cancelled)
return true
})

let countRemoved = countBefore - continuations.count
if countRemoved > 0 {
logger.debug("[\(poolName)] Removed \(countRemoved) continuations for batch \(batchId)")
}
}

/// Releases all resources in the pool and shuts down the event loop.
/// All further uses of the pool will throw an error.
///
Expand Down Expand Up @@ -220,12 +252,17 @@ public actor PostgresConnectionPool {
}

/// Information about the pool and its open connections.
public func poolInfo() async -> PoolInfo {
public func poolInfo(batchId: Int? = nil) async -> PoolInfo {
let connections = connections.compactMap { connection -> PoolInfo.ConnectionInfo? in
PoolInfo.ConnectionInfo(
if let batchId, connection.batchId != batchId {
return nil
}

return PoolInfo.ConnectionInfo(
id: connection.id,
name: nameForConnection(id: connection.id),
usageCounter: connection.usageCounter,
batchId: connection.batchId,
query: connection.query,
queryRuntime: connection.queryRuntime,
state: connection.state)
Expand All @@ -236,7 +273,6 @@ public actor PostgresConnectionPool {
openConnections: connections.count,
activeConnections: connections.count - available.count,
availableConnections: available.count,
usageCounter: PoolConnection.globalUsageCounter,
connections: connections,
isShutdown: isShutdown,
shutdownError: shutdownError)
Expand Down Expand Up @@ -388,6 +424,7 @@ public actor PostgresConnectionPool {
}

poolConnection.state = .active(Date())
poolConnection.batchId = poolContinuation.batchId

do {
// Connection check, etc.
Expand Down
7 changes: 0 additions & 7 deletions Tests/PostgresConnectionPoolTests/ConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ final class ConnectionTests: XCTestCase {
}

func testPoolInfo() async throws {
let initialUsageCounter = PoolConnection.globalUsageCounter
let pool = PostgresConnectionPool(configuration: PostgresHelpers.poolConfiguration(), logger: logger)

let poolInfoBefore = await pool.poolInfo()
print(poolInfoBefore)
XCTAssertEqual(poolInfoBefore.activeConnections, 0)
XCTAssertEqual(poolInfoBefore.availableConnections, 0)
XCTAssertEqual(poolInfoBefore.openConnections, 0)
XCTAssertEqual(poolInfoBefore.usageCounter, initialUsageCounter)
XCTAssertEqual(poolInfoBefore.connections.count, poolInfoBefore.openConnections)
XCTAssertFalse(poolInfoBefore.isShutdown)
XCTAssertNil(poolInfoBefore.shutdownError)
Expand All @@ -66,7 +64,6 @@ final class ConnectionTests: XCTestCase {
XCTAssertEqual(poolInfo.activeConnections, 0)
XCTAssertGreaterThan(poolInfo.availableConnections, 0)
XCTAssertGreaterThan(poolInfo.openConnections, 0)
XCTAssertEqual(poolInfo.usageCounter, 1000 + initialUsageCounter)
XCTAssertEqual(poolInfo.connections.count, poolInfo.openConnections)
XCTAssertFalse(poolInfo.isShutdown)
XCTAssertNil(poolInfo.shutdownError)
Expand All @@ -78,14 +75,12 @@ final class ConnectionTests: XCTestCase {
XCTAssertEqual(poolInfoAfterShutdown.activeConnections, 0)
XCTAssertEqual(poolInfoAfterShutdown.availableConnections, 0)
XCTAssertEqual(poolInfoAfterShutdown.openConnections, 0)
XCTAssertGreaterThan(poolInfoAfterShutdown.usageCounter, 0)
XCTAssertEqual(poolInfoAfterShutdown.connections.count, 0)
XCTAssertTrue(poolInfoAfterShutdown.isShutdown)
XCTAssertNil(poolInfoAfterShutdown.shutdownError)
}

func testPoolSize100() async throws {
let initialUsageCounter = PoolConnection.globalUsageCounter
let pool = PostgresConnectionPool(configuration: PostgresHelpers.poolConfiguration(poolSize: 100), logger: logger)

let start = 1
Expand All @@ -105,7 +100,6 @@ final class ConnectionTests: XCTestCase {
XCTAssertEqual(poolInfo.activeConnections, 0)
XCTAssertGreaterThan(poolInfo.availableConnections, 0)
XCTAssertGreaterThan(poolInfo.openConnections, 0)
XCTAssertEqual(poolInfo.usageCounter, 10000 + initialUsageCounter)
XCTAssertEqual(poolInfo.connections.count, poolInfo.openConnections)
XCTAssertFalse(poolInfo.isShutdown)
XCTAssertNil(poolInfo.shutdownError)
Expand All @@ -116,7 +110,6 @@ final class ConnectionTests: XCTestCase {
XCTAssertEqual(poolInfoIdleClosed.activeConnections, 0)
XCTAssertEqual(poolInfoIdleClosed.availableConnections, 0)
XCTAssertEqual(poolInfoIdleClosed.openConnections, 0)
XCTAssertEqual(poolInfoIdleClosed.usageCounter, 10000 + initialUsageCounter)
XCTAssertEqual(poolInfoIdleClosed.connections.count, poolInfoIdleClosed.openConnections)
XCTAssertFalse(poolInfoIdleClosed.isShutdown)
XCTAssertNil(poolInfoIdleClosed.shutdownError)
Expand Down

0 comments on commit e8c65e5

Please sign in to comment.