Skip to content

Commit

Permalink
Add alpha support for SSH tunnel connections to Redshift/Postgres (#715)
Browse files Browse the repository at this point in the history
* Add alpha support for SSH tunnel connections to Redshift/Postgres

* add missing API dep

* Fix imports

* Review comments bump version
  • Loading branch information
lewish authored Apr 24, 2020
1 parent c679ba6 commit 00437a5
Show file tree
Hide file tree
Showing 24 changed files with 398 additions and 318 deletions.
9 changes: 6 additions & 3 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ ts_library(
"@npm//@google-cloud/bigquery",
"@npm//@types/glob",
"@npm//@types/js-beautify",
"@npm//@types/long",
"@npm//@types/lodash",
"@npm//@types/long",
"@npm//@types/mssql",
"@npm//@types/node",
"@npm//@types/object-hash",
"@npm//@types/pg",
"@npm//@types/ssh2",
"@npm//cron-parser",
"@npm//get-port",
"@npm//glob",
"@npm//js-beautify",
"@npm//lodash",
Expand All @@ -33,6 +35,7 @@ ts_library(
"@npm//protobufjs",
"@npm//snowflake-sdk",
"@npm//sql-formatter",
"@npm//ssh2",
"@npm//vm2",
],
)
Expand Down Expand Up @@ -71,20 +74,20 @@ load("@build_bazel_rules_nodejs//:index.bzl", "nodejs_binary")

nodejs_binary(
name = "vm/compile",
templated_args = ["--node_options=--require=source-map-support/register"],
data = [
":lib",
"@npm//source-map-support",
],
entry_point = ":vm/compile.ts",
templated_args = ["--node_options=--require=source-map-support/register"],
)

nodejs_binary(
name = "vm/query",
templated_args = ["--node_options=--require=source-map-support/register"],
data = [
":lib",
"@npm//source-map-support",
],
entry_point = ":vm/query.ts",
templated_args = ["--node_options=--require=source-map-support/register"],
)
2 changes: 2 additions & 0 deletions api/api.package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"@types/object-hash": "^1.3.1",
"bluebird": "^3.5.3",
"cron-parser": "^2.11.0",
"get-port": "^5.1.1",
"glob": "^7.1.3",
"js-beautify": "^1.10.2",
"pg": "^7.12.1",
Expand All @@ -25,6 +26,7 @@
"promise-pool-executor": "^1.1.1",
"snowflake-sdk": "^1.1.5",
"sql-formatter": "^2.3.3",
"ssh2": "^0.8.9",
"vm2": "^3.6.3"
}
}
11 changes: 3 additions & 8 deletions api/commands/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,11 @@ import { dataform } from "@dataform/protos";
export async function build(
compiledGraph: dataform.ICompiledGraph,
runConfig: dataform.IRunConfig,
credentials: Credentials
dbadapter: dbadapters.IDbAdapter
) {
const prunedGraph = prune(compiledGraph, runConfig);
const dbadapter = dbadapters.create(credentials, compiledGraph.projectConfig.warehouse);
try {
const stateResult = await state(prunedGraph, dbadapter);
return new Builder(prunedGraph, runConfig, stateResult).build();
} finally {
await dbadapter.close();
}
const stateResult = await state(prunedGraph, dbadapter);
return new Builder(prunedGraph, runConfig, stateResult).build();
}

export class Builder {
Expand Down
8 changes: 2 additions & 6 deletions api/commands/credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,15 @@ export interface ITestResult {
}

export async function test(
credentials: Credentials,
warehouse: string,
dbadapter: dbadapters.IDbAdapter,
timeoutMs: number = 10000
): Promise<ITestResult> {
let timer;
try {
const timeout = new Promise<TestResultStatus>(
resolve => (timer = setTimeout(() => resolve(TestResultStatus.TIMED_OUT), timeoutMs))
);
const executeQuery = dbadapters
.create(credentials, warehouse)
.execute("SELECT 1 AS x")
.then(() => TestResultStatus.SUCCESSFUL);
const executeQuery = dbadapter.execute("SELECT 1 AS x").then(() => TestResultStatus.SUCCESSFUL);
return {
status: await Promise.race([executeQuery, timeout])
};
Expand Down
17 changes: 3 additions & 14 deletions api/commands/query.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import { CompileChildProcess } from "@dataform/api/commands/compile";
import { Credentials } from "@dataform/api/commands/credentials";
import * as dbadapters from "@dataform/api/dbadapters";
import { CancellablePromise } from "@dataform/api/utils/cancellable_promise";
import { dataform } from "@dataform/protos";
import * as path from "path";

export function run(
credentials: Credentials,
warehouse: string,
dbadapter: dbadapters.IDbAdapter,
query: string,
options?: {
compileConfig?: dataform.ICompileConfig;
maxResults?: number;
}
): CancellablePromise<any[]> {
return new CancellablePromise(async (resolve, reject, onCancel) => {
const dbadapter = dbadapters.create(credentials, warehouse);
try {
const compiledQuery = await compile(query, options && options.compileConfig);
const results = await dbadapter.execute(compiledQuery, {
Expand All @@ -26,25 +23,17 @@ export function run(
resolve(results.rows);
} catch (e) {
reject(e);
} finally {
await dbadapter.close();
}
});
}

export async function evaluate(
credentials: Credentials,
warehouse: string,
dbadapter: dbadapters.IDbAdapter,
query: string,
compileConfig?: dataform.ICompileConfig
): Promise<dataform.IQueryEvaluationResponse> {
const compiledQuery = await compile(query, compileConfig);
const dbadapter = dbadapters.create(credentials, warehouse);
try {
return await dbadapter.evaluate(compiledQuery);
} finally {
await dbadapter.close();
}
return await dbadapter.evaluate(compiledQuery);
}

export async function compile(
Expand Down
6 changes: 1 addition & 5 deletions api/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ const isSuccessfulAction = (actionResult: dataform.IActionResult) =>
actionResult.status === dataform.ActionResult.ExecutionStatus.CACHE_SKIPPED ||
actionResult.status === dataform.ActionResult.ExecutionStatus.DISABLED;

export function run(graph: dataform.IExecutionGraph, credentials: Credentials): Runner {
const dbadapter = dbadapters.create(credentials, graph.projectConfig.warehouse);
export function run(graph: dataform.IExecutionGraph, dbadapter: dbadapters.IDbAdapter): Runner {
return Runner.create(dbadapter, graph).execute();
}

Expand Down Expand Up @@ -82,9 +81,6 @@ export class Runner {
if (!!this.timeout) {
clearTimeout(this.timeout);
}
if (!!this.adapter) {
await this.adapter.close();
}
}
}

Expand Down
33 changes: 6 additions & 27 deletions api/commands/table.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,21 @@
import { Credentials } from "@dataform/api/commands/credentials";
import * as dbadapters from "@dataform/api/dbadapters";
import { dataform } from "@dataform/protos";

export async function list(
credentials: Credentials,
warehouse: string
): Promise<dataform.ITarget[]> {
const dbadapter = dbadapters.create(credentials, warehouse);
try {
return await dbadapter.tables();
} finally {
await dbadapter.close();
}
export async function list(dbadapter: dbadapters.IDbAdapter): Promise<dataform.ITarget[]> {
return await dbadapter.tables();
}

export async function get(
credentials: Credentials,
warehouse: string,
dbadapter: dbadapters.IDbAdapter,
target: dataform.ITarget
): Promise<dataform.ITableMetadata> {
const dbadapter = dbadapters.create(credentials, warehouse);
try {
return await dbadapter.table(target);
} finally {
await dbadapter.close();
}
return await dbadapter.table(target);
}

export async function preview(
credentials: Credentials,
warehouse: string,
dbadapter: dbadapters.IDbAdapter,
target: dataform.ITarget,
limitRows?: number
): Promise<any[]> {
const dbadapter = dbadapters.create(credentials, warehouse);
try {
return await dbadapter.preview(target, limitRows);
} finally {
await dbadapter.close();
}
return await dbadapter.preview(target, limitRows);
}
11 changes: 2 additions & 9 deletions api/commands/test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
import { Credentials } from "@dataform/api/commands/credentials";
import * as dbadapters from "@dataform/api/dbadapters";
import { dataform } from "@dataform/protos";

export async function test(
credentials: Credentials,
warehouse: string,
dbadapter: dbadapters.IDbAdapter,
tests: dataform.ITest[]
): Promise<dataform.ITestResult[]> {
const dbadapter = dbadapters.create(credentials, warehouse);
try {
return await Promise.all(tests.map(testCase => runTest(dbadapter, testCase)));
} finally {
await dbadapter.close();
}
return await Promise.all(tests.map(testCase => runTest(dbadapter, testCase)));
}

async function runTest(
Expand Down
6 changes: 5 additions & 1 deletion api/dbadapters/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ interface IBigQueryFieldMetadata {
}

export class BigQueryDbAdapter implements IDbAdapter {
public static async create(credentials: Credentials) {
return new BigQueryDbAdapter(credentials);
}

private bigQueryCredentials: dataform.IBigQuery;
private pool: PromisePool.PromisePoolExecutor;

private readonly clients = new Map<string, BigQuery>();

constructor(credentials: Credentials) {
private constructor(credentials: Credentials) {
this.bigQueryCredentials = credentials as dataform.IBigQuery;
// Bigquery allows 50 concurrent queries, and a rate limit of 100/user/second by default.
// These limits should be safely low enough for most projects.
Expand Down
12 changes: 7 additions & 5 deletions api/dbadapters/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,21 @@ export interface IDbAdapter {
close(): Promise<void>;
}

export type DbAdapterConstructor<T extends IDbAdapter> = new (credentials: Credentials) => T;
export interface IDbAdapterClass<T extends IDbAdapter> {
create: (credentials: Credentials) => Promise<T>;
}

const registry: { [warehouseType: string]: DbAdapterConstructor<IDbAdapter> } = {};
const registry: { [warehouseType: string]: IDbAdapterClass<IDbAdapter> } = {};

export function register(warehouseType: string, c: DbAdapterConstructor<IDbAdapter>) {
export function register(warehouseType: string, c: IDbAdapterClass<IDbAdapter>) {
registry[warehouseType] = c;
}

export function create(credentials: Credentials, warehouseType: string): IDbAdapter {
export async function create(credentials: Credentials, warehouseType: string): Promise<IDbAdapter> {
if (!registry[warehouseType]) {
throw new Error(`Unsupported warehouse: ${warehouseType}`);
}
return new registry[warehouseType](credentials);
return await registry[warehouseType].create(credentials);
}

register("bigquery", BigQueryDbAdapter);
Expand Down
35 changes: 28 additions & 7 deletions api/dbadapters/redshift.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Credentials } from "@dataform/api/commands/credentials";
import { IDbAdapter } from "@dataform/api/dbadapters/index";
import { SSHTunnelProxy } from "@dataform/api/ssh_tunnel_proxy";
import { parseRedshiftEvalError } from "@dataform/api/utils/error_parsing";
import { dataform } from "@dataform/protos";
import * as pg from "pg";
Expand All @@ -11,21 +12,38 @@ interface ICursor {
}

export class RedshiftDbAdapter implements IDbAdapter {
private queryExecutor: PgPoolExecutor;

constructor(credentials: Credentials) {
public static async create(credentials: Credentials) {
const jdbcCredentials = credentials as dataform.IJDBC;
const clientConfig: pg.ClientConfig = {
host: jdbcCredentials.host,
port: jdbcCredentials.port,
const baseClientConfig: Partial<pg.ClientConfig> = {
user: jdbcCredentials.username,
password: jdbcCredentials.password,
database: jdbcCredentials.databaseName,
ssl: true
};
this.queryExecutor = new PgPoolExecutor(clientConfig);
if (jdbcCredentials.sshTunnel) {
const sshTunnel = await SSHTunnelProxy.create(jdbcCredentials.sshTunnel, {
host: jdbcCredentials.host,
port: jdbcCredentials.port
});
const queryExecutor = new PgPoolExecutor({
...baseClientConfig,
host: "127.0.0.1",
port: sshTunnel.localPort
});
return new RedshiftDbAdapter(queryExecutor, sshTunnel);
} else {
const clientConfig: pg.ClientConfig = {
...baseClientConfig,
host: jdbcCredentials.host,
port: jdbcCredentials.port
};
const queryExecutor = new PgPoolExecutor(clientConfig);
return new RedshiftDbAdapter(queryExecutor);
}
}

private constructor(private queryExecutor: PgPoolExecutor, private sshTunnel?: SSHTunnelProxy) {}

public async execute(
statement: string,
options: {
Expand Down Expand Up @@ -138,6 +156,9 @@ export class RedshiftDbAdapter implements IDbAdapter {

public async close() {
await this.queryExecutor.close();
if (this.sshTunnel) {
await this.sshTunnel.close();
}
}

public async prepareStateMetadataTable(): Promise<void> {
Expand Down
Loading

0 comments on commit 00437a5

Please sign in to comment.