Skip to content

Commit

Permalink
Merge pull request #2 from Outdooractive/pool_info
Browse files Browse the repository at this point in the history
Added PostgresConnectionPool.poolInfo()
  • Loading branch information
trasch authored Apr 25, 2023
2 parents 6c33c29 + f236985 commit db27471
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 13 deletions.
23 changes: 20 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
# PostgresConnectionPool

A simple connection pool on top of PostgresNIO.
A simple connection pool on top of [PostgresNIO](https://github.com/vapor/postgres-nio) and [PostgresKit](https://github.com/vapor/postgres-kit).

[![](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2FOutdooractive%2FPostgresConnectionPool%2Fbadge%3Ftype%3Dswift-versions)](https://swiftpackageindex.com/Outdooractive/PostgresConnectionPool)
[![](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2FOutdooractive%2FPostgresConnectionPool%2Fbadge%3Ftype%3Dplatforms)](https://swiftpackageindex.com/Outdooractive/PostgresConnectionPool)

## Requirements

This package requires Swift 5.7 or higher (at least Xcode 13), and compiles on macOS (\>= macOS 10.15) and Linux.

## Installation with Swift Package Manager

```swift
dependencies: [
.package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.4.0"),
.package(url: "https://github.com/Outdooractive/PostgresConnectionPool.git", from: "0.5.0"),
],
targets: [
.target(name: "MyTarget", dependencies: [
Expand Down Expand Up @@ -48,11 +52,24 @@ let pool = PostgresConnectionPool(configuration: configuration, logger: logger)
try await pool.connection(callback: { connection in
try await connection.query(PostgresQuery(stringLiteral: "SELECT 1"), logger: logger)
})

// Generic object loading
func fetchObjects<T: Decodable>(_ sql: String) async throws -> [T] {
try await pool.connection({ connection in
return try await connection.sql().raw(SQLQueryString(stringLiteral: sql)).all(decoding: T.self)
})
}

// Open connections, current SQL queries, etc.
await print(pool.info())

// Always call `shutdown()` before releasing a pool
await pool.shutdown()
```

## Contributing

Please create an issue or open a pull request with a fix.
Please create an issue or open a pull request with a fix or enhancement.

## License

Expand Down
20 changes: 12 additions & 8 deletions Sources/PostgresConnectionPool/PoolConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,29 @@ import PostgresNIO

final class PoolConnection: Identifiable, Equatable {

enum State: Equatable {
case active(Date)
case available
case closed
case connecting
}

private static var connectionId: Int = 0

private(set) var usageCounter = 0

let id: Int
var connection: PostgresConnection?
var state: State = .connecting {
var state: PoolConnectionState = .connecting {
didSet {
if case .active = state { usageCounter += 1 }
}
}

private var queryStartTimestamp: Date?
var query: String? {
didSet {
queryStartTimestamp = (query == nil ? nil : Date())
}
}
var queryRuntime: TimeInterval? {
guard let queryStartTimestamp else { return nil }
return Date().timeIntervalSince(queryStartTimestamp)
}

init() {
self.id = PoolConnection.connectionId

Expand Down
12 changes: 12 additions & 0 deletions Sources/PostgresConnectionPool/PoolConnectionState.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//
// Created by Thomas Rasch on 24.04.23.
//

import Foundation

public enum PoolConnectionState: Equatable {
case active(Date)
case available
case closed
case connecting
}
2 changes: 2 additions & 0 deletions Sources/PostgresConnectionPool/PoolError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ public enum PoolError: Error {
case connectionFailed
/// The pool was already shut down.
case poolDestroyed
/// Something unexpected happened.
case unknown

}
27 changes: 27 additions & 0 deletions Sources/PostgresConnectionPool/PoolInfo.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//
// Created by Thomas Rasch on 24.04.23.
//

import Foundation

/// General information about the pool and its open connections.
public struct PoolInfo {

/// Information about an open connection.
public struct ConnectionInfo {
public var id: Int
public var name: String
public var usageCounter: Int
public var query: String?
public var queryRuntime: TimeInterval?
public var state: PoolConnectionState
}

public var name: String
public var openConnections: Int
public var activeConnections: Int
public var availableConnections: Int

public var connections: [ConnectionInfo]

}
24 changes: 22 additions & 2 deletions Sources/PostgresConnectionPool/PostgresConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public actor PostgresConnectionPool {
/// Takes one connection from the pool and dishes it out to the caller.
@discardableResult
public func connection<T>(
_ callback: (PostgresConnection) async throws -> T)
_ callback: (PostgresConnectionWrapper) async throws -> T)
async throws
-> T
{
Expand All @@ -89,7 +89,7 @@ public actor PostgresConnectionPool {
throw PoolError.cancelled
}

let result = try await callback(poolConnection!.connection!)
let result = try await PostgresConnectionWrapper.distribute(poolConnection: poolConnection, callback: callback)

await releaseConnection(poolConnection!)

Expand Down Expand Up @@ -169,6 +169,26 @@ public actor PostgresConnectionPool {
try? await eventLoopGroup.shutdownGracefully()
}

/// Information about the pool and its open connections.
func poolInfo() async -> PoolInfo {
let connections = connections.compactMap { connection -> PoolInfo.ConnectionInfo? in
return PoolInfo.ConnectionInfo(
id: connection.id,
name: nameForConnection(id: connection.id),
usageCounter: connection.usageCounter,
query: connection.query,
queryRuntime: connection.queryRuntime,
state: connection.state)
}

return PoolInfo(
name: poolName,
openConnections: connections.count,
activeConnections: connections.count - available.count,
availableConnections: available.count,
connections: connections)
}

// MARK: - Private

private func closeConnection(_ poolConnection: PoolConnection) async {
Expand Down
78 changes: 78 additions & 0 deletions Sources/PostgresConnectionPool/PostgresConnectionWrapper.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//
// Created by Thomas Rasch on 24.04.23.
//

import Foundation
import PostgresNIO

public final class PostgresConnectionWrapper {

private let poolConnection: PoolConnection
private let postgresConnection: PostgresConnection

static func distribute<T>(
poolConnection: PoolConnection?,
callback: (PostgresConnectionWrapper) async throws -> T)
async throws -> T
{
guard let poolConnection,
let connectionWrapper = PostgresConnectionWrapper(poolConnection)
else { throw PoolError.unknown }

connectionWrapper.poolConnection.query = nil
let result = try await callback(connectionWrapper)
connectionWrapper.poolConnection.query = nil

return result
}

init?(_ poolConnection: PoolConnection) {
guard let postgresConnection = poolConnection.connection else { return nil }

self.poolConnection = poolConnection
self.postgresConnection = postgresConnection
}

// MARK: - Public interface, from PostgresConnection

/// A logger to use in case
public var logger: Logger {
postgresConnection.logger
}

public var isClosed: Bool {
postgresConnection.isClosed
}

/// Run a query on the Postgres server the connection is connected to.
///
/// - Parameters:
/// - query: The ``PostgresQuery`` to run
/// - logger: The `Logger` to log into for the query
/// - file: The file, the query was started in. Used for better error reporting.
/// - line: The line, the query was started in. Used for better error reporting.
/// - Returns: A ``PostgresRowSequence`` containing the rows the server sent as the query result.
/// The sequence can be discarded.
@discardableResult
public func query(
_ query: PostgresQuery,
logger: Logger,
file: String = #fileID,
line: Int = #line)
async throws -> PostgresRowSequence
{
poolConnection.query = query.sql
return try await postgresConnection.query(query, logger: logger, file: file, line: line)
}

/// Add a handler for NotificationResponse messages on a certain channel. This is used in conjunction with PostgreSQL's `LISTEN`/`NOTIFY` support: to listen on a channel, you add a listener using this method to handle the NotificationResponse messages, then issue a `LISTEN` query to instruct PostgreSQL to begin sending NotificationResponse messages.
@discardableResult
public func addListener(
channel: String,
handler notificationHandler: @escaping (PostgresListenContext, PostgresMessage.NotificationResponse) -> Void)
-> PostgresListenContext
{
postgresConnection.addListener(channel: channel, handler: notificationHandler)
}

}

0 comments on commit db27471

Please sign in to comment.