Skip to content

Commit

Permalink
Azure SQL Data Warehouse support. (#351)
Browse files Browse the repository at this point in the history
* initial commit, added sqldatawarehouse support

* code review fixes: consistency, use const, async.

* some cleanup

* some more cleanup

* slightly more cleanup

* integration tests

* lewis comments

* version bump

* creds
  • Loading branch information
BenBirt authored and probot-auto-merge[bot] committed Aug 9, 2019
1 parent 58a5226 commit 38c149b
Show file tree
Hide file tree
Showing 40 changed files with 741 additions and 61 deletions.
1 change: 1 addition & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ts_library(
"@npm//cron-parser",
"@npm//glob",
"@npm//node-redshift",
"@npm//mssql",
"@npm//pretty-ms",
"@npm//promise-pool-executor",
"@npm//protobufjs",
Expand Down
1 change: 1 addition & 0 deletions api/api.package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"cron-parser": "^2.11.0",
"glob": "^7.1.3",
"node-redshift": "^0.1.5",
"mssql": "^5.1.0",
"pretty-ms": "^4.0.0",
"promise-pool-executor": "^1.1.1",
"snowflake-sdk": "^1.1.5",
Expand Down
10 changes: 9 additions & 1 deletion api/commands/credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as fs from "fs";

export const CREDENTIALS_FILENAME = ".df-credentials.json";

export type Credentials = dataform.IBigQuery | dataform.IJDBC | dataform.ISnowflake;
export type Credentials = dataform.IBigQuery | dataform.IJDBC | dataform.ISnowflake | dataform.ISQLDataWarehouse;

export function read(warehouse: string, credentialsPath: string): Credentials {
if (!fs.existsSync(credentialsPath)) {
Expand Down Expand Up @@ -41,6 +41,14 @@ export function coerce(warehouse: string, credentials: any): Credentials {
requiredWarehouseProps[warehouse]
);
}
case WarehouseType.SQLDATAWAREHOUSE: {
return validateAnyAsCredentials(
credentials,
dataform.SQLDataWarehouse.verify,
dataform.SQLDataWarehouse.create,
requiredWarehouseProps[warehouse]
)
}
default:
throw new Error(`Unrecognized warehouse: ${warehouse}`);
}
Expand Down
2 changes: 1 addition & 1 deletion api/commands/state.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IDbAdapter } from "@dataform/api/dbadapters";
import { dataform } from "@dataform/protos";
import { IDbAdapter } from "../dbadapters";

export function state(
compiledGraph: dataform.ICompiledGraph,
Expand Down
8 changes: 2 additions & 6 deletions api/dbadapters/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ export class BigQueryDbAdapter implements IDbAdapter {
return rowsResult[0];
}
return this.execute(
`SELECT * FROM \`${metadata.tableReference.projectId}.${metadata.tableReference.datasetId}.${
metadata.tableReference.tableId
}\` LIMIT ${limitRows}`
`SELECT * FROM \`${metadata.tableReference.projectId}.${metadata.tableReference.datasetId}.${metadata.tableReference.tableId}\` LIMIT ${limitRows}`
);
}

Expand All @@ -175,9 +173,7 @@ export class BigQueryDbAdapter implements IDbAdapter {

if (metadata.location.toUpperCase() !== location.toUpperCase()) {
throw new Error(
`Cannot create dataset "${schema}" in location "${location}". It already exists in location "${
metadata.location
}". Change your default dataset location or delete the existing dataset.`
`Cannot create dataset "${schema}" in location "${location}". It already exists in location "${metadata.location}". Change your default dataset location or delete the existing dataset.`
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions api/dbadapters/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Credentials } from "@dataform/api/commands/credentials";
import { BigQueryDbAdapter } from "@dataform/api/dbadapters/bigquery";
import { RedshiftDbAdapter } from "@dataform/api/dbadapters/redshift";
import { SnowflakeDbAdapter } from "@dataform/api/dbadapters/snowflake";
import { SQLDataWarehouseDBAdapter } from "@dataform/api/dbadapters/sqldatawarehouse";
import { dataform } from "@dataform/protos";

export type OnCancel = (handleCancel: () => void) => void;
Expand Down Expand Up @@ -37,3 +38,4 @@ register("bigquery", BigQueryDbAdapter);
register("postgres", RedshiftDbAdapter);
register("redshift", RedshiftDbAdapter);
register("snowflake", SnowflakeDbAdapter);
register("sqldatawarehouse", SQLDataWarehouseDBAdapter);
104 changes: 104 additions & 0 deletions api/dbadapters/sqldatawarehouse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { Credentials } from "@dataform/api/commands/credentials";
import { IDbAdapter, OnCancel } from "@dataform/api/dbadapters/index";
import { dataform } from "@dataform/protos";
import { ConnectionPool } from "mssql";

const INFORMATION_SCHEMA_SCHEMA_NAME = "information_schema";
const TABLE_NAME_COL_NAME = "table_name";
const TABLE_SCHEMA_COL_NAME = "table_schema";
const TABLE_TYPE_COL_NAME = "table_type";
const COLUMN_NAME_COL_NAME = "column_name";
const DATA_TYPE_COL_NAME = "data_type";
const IS_NULLABLE_COL_NAME = "is_nullable";

export class SQLDataWarehouseDBAdapter implements IDbAdapter {
private pool: Promise<ConnectionPool>;

constructor(credentials: Credentials) {
const sqlDataWarehouseCredentials = credentials as dataform.ISQLDataWarehouse;
this.pool = new Promise((resolve, reject) => {
const conn = new ConnectionPool({
server: sqlDataWarehouseCredentials.server,
port: sqlDataWarehouseCredentials.port,
user: sqlDataWarehouseCredentials.username,
password: sqlDataWarehouseCredentials.password,
database: sqlDataWarehouseCredentials.database,
options: {
encrypt: true
}
}).connect();
conn
.then(pool => {
pool.on("error", err => {
throw new Error(err);
});
resolve(conn);
})
.catch(e => reject(e));
});
}

public async execute(statement: string, onCancel?: OnCancel) {
const request = (await this.pool).request();
if (onCancel) {
onCancel(() => request.cancel());
}
return (await request.query(statement)).recordset;
}

public async evaluate(statement: string) {
await this.execute(`explain ${statement}`);
}

public async tables(): Promise<dataform.ITarget[]> {
const result = await this.execute(
`select ${TABLE_SCHEMA_COL_NAME}, ${TABLE_NAME_COL_NAME} from ${INFORMATION_SCHEMA_SCHEMA_NAME}.tables`
);
return result.map(row => ({
schema: row[TABLE_SCHEMA_COL_NAME],
name: row[TABLE_NAME_COL_NAME]
}));
}

public async table(target: dataform.ITarget): Promise<dataform.ITableMetadata> {
const [tableData, columnData] = await Promise.all([
this.execute(
`select ${TABLE_TYPE_COL_NAME} from ${INFORMATION_SCHEMA_SCHEMA_NAME}.tables
where ${TABLE_SCHEMA_COL_NAME} = '${target.schema}' AND ${TABLE_NAME_COL_NAME} = '${target.name}'`
),
this.execute(
`select ${COLUMN_NAME_COL_NAME}, ${DATA_TYPE_COL_NAME}, ${IS_NULLABLE_COL_NAME}
from ${INFORMATION_SCHEMA_SCHEMA_NAME}.columns
where ${TABLE_SCHEMA_COL_NAME} = '${target.schema}' AND ${TABLE_NAME_COL_NAME} = '${target.name}'`
)
]);

if (tableData.length === 0) {
throw new Error(`Could not find relation: ${target.schema}.${target.name}`);
}

// The table exists.
return {
target,
type: tableData[0][TABLE_TYPE_COL_NAME] === "VIEW" ? "view" : "table",
fields: columnData.map(row => ({
name: row[COLUMN_NAME_COL_NAME],
primitive: row[DATA_TYPE_COL_NAME],
flags: row[IS_NULLABLE_COL_NAME] && row[IS_NULLABLE_COL_NAME] === "YES" ? ["nullable"] : []
}))
};
}

public async preview(target: dataform.ITarget, limitRows: number = 10): Promise<any[]> {
return this.execute(`SELECT * FROM "${target.schema}"."${target.name}" LIMIT ${limitRows}`);
}

public async prepareSchema(schema: string): Promise<void> {
await this.execute(
`if not exists ( select schema_name from ${INFORMATION_SCHEMA_SCHEMA_NAME}.schemata where schema_name = '${schema}' )
begin
exec sp_executesql N'create schema ${schema}'
end `
);
}
}
9 changes: 4 additions & 5 deletions assertion_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ export class DatasetAssertion {
WITH base AS (
SELECT
${this.groupCols.map((field, i) => `${field} as c_${i}`).join(", ")},
${this.groupCols.join(", ")},
SUM(1) as row_count
FROM ${this.dataset}
GROUP BY
${this.groupCols.map((field, i) => `${i+1}`).join(", ")}
${this.groupCols.join(", ")}
)
SELECT
Expand All @@ -33,6 +32,6 @@ export class DatasetAssertion {
base
WHERE
row_count > 1
`
`;
}
}
}
16 changes: 16 additions & 0 deletions cli/credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ export function getRedshiftCredentials() {
);
}

export function getSQLDataWarehouseCredentials(): dataform.ISQLDataWarehouse {
const server = question("Enter your server name (for example 'name.database.windows.net'):");
const port = intQuestion("Enter your server port:", 1433);
const username = question("Enter your datawarehouse user:");
const password = passwordQuestion("Enter your datawarehouse password:");
const database = question("Enter the database name:");

return {
server,
port,
username,
password,
database
};
}

export function getSnowflakeCredentials(): dataform.ISnowflake {
const accountId = question(
"Enter your Snowflake account identifier, including region (for example 'myaccount.us-east-1'):"
Expand Down
14 changes: 7 additions & 7 deletions cli/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import {
getBigQueryCredentials,
getPostgresCredentials,
getRedshiftCredentials,
getSnowflakeCredentials
getSnowflakeCredentials,
getSQLDataWarehouseCredentials
} from "@dataform/cli/credentials";
import { actuallyResolve, assertPathExists, compiledGraphHasErrors } from "@dataform/cli/util";
import { createYargsCli, INamedOption } from "@dataform/cli/yargswrapper";
Expand Down Expand Up @@ -49,9 +50,7 @@ const projectDirMustExistOption = {
assertPathExists(path.resolve(argv["project-dir"], "dataform.json"));
} catch (e) {
throw new Error(
`${
argv["project-dir"]
} does not appear to be a dataform directory (missing dataform.json file).`
`${argv["project-dir"]} does not appear to be a dataform directory (missing dataform.json file).`
);
}
}
Expand Down Expand Up @@ -206,9 +205,7 @@ const builtYargs = createYargsCli({
},
{
format: "init-creds <warehouse> [project-dir]",
description: `Create a ${
credentials.CREDENTIALS_FILENAME
} file for dataform to use when accessing your warehouse.`,
description: `Create a ${credentials.CREDENTIALS_FILENAME} file for Dataform to use when accessing your warehouse.`,
positionalOptions: [warehouseOption, projectDirMustExistOption],
options: [
{
Expand All @@ -232,6 +229,9 @@ const builtYargs = createYargsCli({
case "redshift": {
return getRedshiftCredentials();
}
case "sqldatawarehouse": {
return getSQLDataWarehouseCredentials();
}
case "snowflake": {
return getSnowflakeCredentials();
}
Expand Down
5 changes: 2 additions & 3 deletions core/adapters/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import { dataform } from "@dataform/protos";

export abstract class Adapter {
public where(query: string, where: string) {
return `select * from (
${query})
return `select * from (${query}) as subquery
where ${where}`;
}

Expand All @@ -23,7 +22,7 @@ export abstract class Adapter {
insert into ${this.resolveTarget(target)}
(${columns.join(",")})
select ${columns.join(",")}
from (${query})`;
from (${query}) as insertions`;
}

public dropIfExists(target: dataform.ITarget, type: string) {
Expand Down
29 changes: 22 additions & 7 deletions core/adapters/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { BigQueryAdapter } from "@dataform/core/adapters/bigquery";
import { RedshiftAdapter } from "@dataform/core/adapters/redshift";
import { SnowflakeAdapter } from "@dataform/core/adapters/snowflake";
import { SQLDataWarehouseAdapter } from "@dataform/core/adapters/sqldatawarehouse";
import { Tasks } from "@dataform/core/tasks";
import { dataform } from "@dataform/protos";
import { Tasks } from "../tasks";
import { BigQueryAdapter } from "./bigquery";
import { RedshiftAdapter } from "./redshift";
import { SnowflakeAdapter } from "./snowflake";

export interface IAdapter {
resolveTarget(target: dataform.ITarget): string;
Expand All @@ -26,11 +27,16 @@ export enum WarehouseType {
BIGQUERY = "bigquery",
POSTGRES = "postgres",
REDSHIFT = "redshift",
SNOWFLAKE = "snowflake"
SNOWFLAKE = "snowflake",
SQLDATAWAREHOUSE = "sqldatawarehouse"
}

const CANCELLATION_SUPPORTED = [WarehouseType.BIGQUERY, WarehouseType.SQLDATAWAREHOUSE];

export function supportsCancel(warehouseType: WarehouseType) {
return warehouseType === WarehouseType.BIGQUERY;
return CANCELLATION_SUPPORTED.some(w => {
return w === warehouseType;
});
}

const requiredBigQueryWarehouseProps: Array<keyof dataform.IBigQuery> = [
Expand All @@ -52,12 +58,20 @@ const requiredSnowflakeWarehouseProps: Array<keyof dataform.ISnowflake> = [
"databaseName",
"warehouse"
];
const requiredSQLDataWarehouseProps: Array<keyof dataform.ISQLDataWarehouse> = [
"server",
"port",
"username",
"password",
"database"
];

export const requiredWarehouseProps = {
[WarehouseType.BIGQUERY]: requiredBigQueryWarehouseProps,
[WarehouseType.POSTGRES]: requiredJdbcWarehouseProps,
[WarehouseType.REDSHIFT]: requiredJdbcWarehouseProps,
[WarehouseType.SNOWFLAKE]: requiredSnowflakeWarehouseProps
[WarehouseType.SNOWFLAKE]: requiredSnowflakeWarehouseProps,
[WarehouseType.SQLDATAWAREHOUSE]: requiredSQLDataWarehouseProps
};

const registry: { [warehouseType: string]: AdapterConstructor<IAdapter> } = {};
Expand All @@ -80,3 +94,4 @@ register("bigquery", BigQueryAdapter);
register("postgres", RedshiftAdapter);
register("redshift", RedshiftAdapter);
register("snowflake", SnowflakeAdapter);
register("sqldatawarehouse", SQLDataWarehouseAdapter);
Loading

0 comments on commit 38c149b

Please sign in to comment.