Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compression and connection pooling #35

Closed
wants to merge 9 commits into from
1 change: 1 addition & 0 deletions doc/changes/changelog.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions doc/changes/changes_0.1.5.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Exasol Driver ts 0.1.5, released 2024-??-??

Code name: WIP

## Summary

This release ....

20 changes: 11 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@exasol/exasol-driver-ts",
"version": "0.1.4",
"version": "0.1.5",
"description": "Exasol SQL Driver",
"scripts": {
"build": "rollup -c",
Expand Down
14 changes: 12 additions & 2 deletions src/lib/pool/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ export interface PoolItem {
active?: boolean;
name: string;
}

//TODO: multiple todos here,
//TBD: how do we want the pool to be filled? on request, on driver startup?
//TBD: a min pool size?
//TBD: a query time out parameter to 'unblock' the pool? also check connection for this
export class ConnectionPool<T extends PoolItem> {
private readonly pool = new Map<string, { claimed: boolean; connection: T }>();
constructor(private readonly max = 1, private readonly logger: ILogger) {}
constructor(
private readonly max = 1,
private readonly logger: ILogger,
) {}

async add(connection: T): Promise<void> {
this.logger.debug(`[Pool:${connection.name}] Add connection`);
Expand Down Expand Up @@ -46,6 +52,7 @@ export class ConnectionPool<T extends PoolItem> {

acquire(): T | undefined {
const keys = Array.from(this.pool.keys());

for (let index = 0; index < keys.length; index++) {
const key = keys[index];
const item = this.pool.get(key);
Expand All @@ -67,4 +74,7 @@ export class ConnectionPool<T extends PoolItem> {

return undefined;
}
atMaxSize() {
return this.pool.size == this.max;
}
}
96 changes: 63 additions & 33 deletions src/lib/sql-client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as forge from 'node-forge';


import { getURIScheme } from './utils';
import { CreatePreparedStatementResponse, PublicKeyResponse, SQLQueriesResponse, SQLResponse } from './types';
import { Statement } from './statement';
Expand Down Expand Up @@ -44,6 +43,7 @@ export interface Config {
interface InternalConfig {
apiVersion: number;
compression: boolean;
poolMaxConnections: number;
}

export const driverVersion = 'v1.0.0';
Expand All @@ -61,16 +61,21 @@ export class ExasolDriver implements IExasolDriver {
encryption: true,
compression: false,
apiVersion: 3,
poolMaxConnections: 1,
};
private readonly config: Config & InternalConfig & { websocketFactory: websocketFactory };
private readonly logger: ILogger;
private closed = false;

private readonly pool: ConnectionPool<Connection>;

constructor(websocketFactory: websocketFactory, config: Partial<Config>, logger: ILogger = new Logger(LogLevel.Debug)) {
constructor(
websocketFactory: websocketFactory,
config: Partial<Config> & Partial<InternalConfig>,
logger: ILogger = new Logger(LogLevel.Debug),
) {
// Used internally to avoid parallel execution
this.pool = new ConnectionPool<Connection>(1, logger);
this.pool = new ConnectionPool<Connection>(config.poolMaxConnections, logger);
this.config = {
...this.defaultConfig,
...config,
Expand All @@ -82,7 +87,14 @@ export class ExasolDriver implements IExasolDriver {
/**
* @inheritDoc
*/
//TODO: add a check against 'overloading' the pool?
//problem is this function is both used publically (see examples) and internally in acquire()
//solution would be to hollow out this function, maybe have it call acquire() instead and have acquire call a new function containing this logic
//a la "create connection and add to pool
public async connect(): Promise<void> {
await this.acquire();
}
private async createConnectionAndAddToPool(): Promise<void> {
let hasCredentials = false;
let isBasicAuth = false;
if (this.config.user && this.config.password) {
Expand Down Expand Up @@ -127,6 +139,7 @@ export class ExasolDriver implements IExasolDriver {
};
webSocket.onopen = () => {
this.logger.debug('[SQLClient] Login');
//add connection to pool
this.pool
.add(connection)
.then(() => {
Expand Down Expand Up @@ -184,16 +197,16 @@ export class ExasolDriver implements IExasolDriver {
if (this.closed) {
return Promise.reject(ErrClosed);
}
const connection = this.pool.acquire();
const connection = await this.acquire();
if (connection) {
return connection
.sendCommandWithNoResult(cmd)
.then(() => {
this.pool.release(connection);
this.release(connection);
return;
})
.catch((err) => {
this.pool.release(connection);
this.release(connection);
throw err;
});
}
Expand All @@ -207,25 +220,25 @@ export class ExasolDriver implements IExasolDriver {
async query(
sqlStatement: string,
attributes?: Partial<Attributes> | undefined,
getCancel?: CetCancelFunction | undefined
getCancel?: CetCancelFunction | undefined,
): Promise<QueryResult>;
async query(
sqlStatement: string,
attributes?: Partial<Attributes> | undefined,
getCancel?: CetCancelFunction | undefined,
responseType?: 'default' | undefined
responseType?: 'default' | undefined,
): Promise<QueryResult>;
async query(
sqlStatement: string,
attributes?: Partial<Attributes> | undefined,
getCancel?: CetCancelFunction | undefined,
responseType?: 'raw' | undefined
responseType?: 'raw' | undefined,
): Promise<SQLResponse<SQLQueriesResponse>>;
async query(
sqlStatement: string,
attributes?: Partial<Attributes> | undefined,
getCancel?: CetCancelFunction | undefined,
responseType?: 'default' | 'raw'
responseType?: 'default' | 'raw',
): Promise<QueryResult | SQLResponse<SQLQueriesResponse>> {
const connection = await this.acquire();
return connection
Expand All @@ -235,7 +248,7 @@ export class ExasolDriver implements IExasolDriver {
})
.then((data) => {
if (connection) {
this.pool.release(connection);
this.release(connection);
}
return data;
})
Expand All @@ -256,7 +269,7 @@ export class ExasolDriver implements IExasolDriver {
})
.catch((err) => {
if (connection) {
this.pool.release(connection);
this.release(connection);
}
throw err;
});
Expand All @@ -268,25 +281,25 @@ export class ExasolDriver implements IExasolDriver {
async execute(
sqlStatement: string,
attributes?: Partial<Attributes> | undefined,
getCancel?: CetCancelFunction | undefined
getCancel?: CetCancelFunction | undefined,
): Promise<number>;
async execute(
sqlStatement: string,
attributes?: Partial<Attributes> | undefined,
getCancel?: CetCancelFunction | undefined,
responseType?: 'default' | undefined
responseType?: 'default' | undefined,
): Promise<number>;
async execute(
sqlStatement: string,
attributes?: Partial<Attributes> | undefined,
getCancel?: CetCancelFunction | undefined,
responseType?: 'raw' | undefined
responseType?: 'raw' | undefined,
): Promise<SQLResponse<SQLQueriesResponse>>;
async execute(
sqlStatement: string,
attributes?: Partial<Attributes> | undefined,
getCancel?: CetCancelFunction | undefined,
responseType?: 'default' | 'raw'
responseType?: 'default' | 'raw',
): Promise<SQLResponse<SQLQueriesResponse> | number> {
const connection = await this.acquire();
return connection
Expand All @@ -296,7 +309,7 @@ export class ExasolDriver implements IExasolDriver {
})
.then((data) => {
if (connection) {
this.pool.release(connection);
this.release(connection);
}
return data;
})
Expand All @@ -317,7 +330,7 @@ export class ExasolDriver implements IExasolDriver {
})
.catch((err) => {
if (connection) {
this.pool.release(connection);
this.release(connection);
}
throw err;
});
Expand All @@ -329,7 +342,7 @@ export class ExasolDriver implements IExasolDriver {
public async executeBatch(
sqlStatements: string[],
attributes?: Partial<Attributes>,
getCancel?: CetCancelFunction
getCancel?: CetCancelFunction,
): Promise<SQLResponse<SQLQueriesResponse>> {
const connection = await this.acquire();

Expand All @@ -340,13 +353,13 @@ export class ExasolDriver implements IExasolDriver {
})
.then((data) => {
if (connection) {
this.pool.release(connection);
this.release(connection);
}
return data;
})
.catch((err) => {
if (connection) {
this.pool.release(connection);
this.release(connection);
}
throw err;
});
Expand All @@ -363,7 +376,7 @@ export class ExasolDriver implements IExasolDriver {
command: 'createPreparedStatement',
sqlText: sqlStatement,
},
getCancel
getCancel,
)
.then((response) => {
return new Statement(connection, this.pool, response.responseData.statementHandle, response.responseData.parameterData.columns);
Expand All @@ -380,41 +393,58 @@ export class ExasolDriver implements IExasolDriver {
.sendCommand<T>(cmd, getCancel)
.then((data) => {
if (connection) {
this.pool.release(connection);
this.release(connection);
}
return data;
})
.catch((err) => {
if (connection) {
this.pool.release(connection);
this.release(connection);
}
throw err;
});
}
// Attempts to acquire a connection from the pool.
// If there is one available, acquire the connection.
// If there isn't one:
// If the pool is at max size: wait until a connection gets released and then acquire it.
// If the pool is not at max size: Create a new connection using the connect() method and acquire the new connection from the pool.

private async acquire() {
if (this.closed) {
return Promise.reject(ErrClosed);
}

//acquire a connection.
let connection = this.pool.acquire();
//if acquiring a connection failed:
if (!connection) {
this.logger.debug("[SQLClient] Found no free connection and pool did not reach it's limit, will create new connection");
await this.connect();
connection = this.pool.acquire();
}
if (!connection) {
return Promise.reject(ErrInvalidConn);
//if the pool is at max size, wait until a connection opens up/gets released and acquire it.
if (this.pool.atMaxSize()) {
while (!connection) {
connection = this.pool.acquire();
}
} else {
//create a new connection if the pool is not at max size, add it to the pool and then acquire it.
this.logger.debug('[SQLClient] Found no free connection and pool did not reach its limit, will create new connection');
await this.createConnectionAndAddToPool();
connection = this.pool.acquire();
}
if (!connection) {
return Promise.reject(ErrInvalidConn);
}
}
return connection;
}


private async release(connection: Connection) {
this.pool.release(connection);
}

private async loginBasicAuth() {
return this.sendCommand<PublicKeyResponse>({
command: 'login',
protocolVersion: this.config.apiVersion,
}).then((response) => {

const n = new forge.jsbn.BigInteger(response.responseData.publicKeyModulus, 16);
const e = new forge.jsbn.BigInteger(response.responseData.publicKeyExponent, 16);

Expand Down
Loading