Skip to content

Commit

Permalink
Ask me when incremental (#558)
Browse files Browse the repository at this point in the history
* Made ifIncremental take an optional second parameter, made ifIncremental and isIncremental no longer be hidden in the API reference, added info to incremental datasets page of the how to guides in the docs

* Added incremental_where to SQLX info, simplified additional features sections, added more links to the API referece

* Fixd typo

* Implemented when() and incremental(), updated tests to reflect

* Improved elloquency

* Added incPreOps implementation (to be removed)

* Added in-line incremental functionality within pre and post operation blocks

* Made proto and core changes suggested

* Removed incremental_where from docs, some minor tweaks

* Code and docs tweaks

* Migrated pre and post op to individual adapters

* Moved seperate pre and post operations to general publish tasks

* Added test for BQ task publishing, progress with adding pre and post operations to example sqlx file

* Fixed issues with tests, and some underlying issues

* Progress with shortening code

* Made bigquery adapter and test more concise

* Added updated pre and post operation logic, with core back compatability tests

* Added checks for example definition files successfully executing

* Updated adapter definition example files, bumped bazel version, fixed minor adapter issues

* Minor code tweaks

* Added essential if incremental logic to adapters, made tests execution status checks more concise

* Code style tweaks

* Updated integration tests to use new examples

* Narrowed scope of publish tasks test for pre and post ops

* Minor code style changes
  • Loading branch information
Ekrekr authored Jan 23, 2020
1 parent 2c24296 commit 058b07a
Show file tree
Hide file tree
Showing 27 changed files with 479 additions and 220 deletions.
10 changes: 2 additions & 8 deletions api/commands/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,13 @@ export class Builder {
}

public buildTable(t: dataform.ITable, tableMetadata: dataform.ITableMetadata) {
const emptyTasks = [] as dataform.IExecutionTask[];

if (t.protected && this.runConfig.fullRefresh) {
throw new Error("Protected datasets cannot be fully refreshed.");
}

const tasks = t.disabled
? emptyTasks
: emptyTasks.concat(
(t.preOps || []).map(pre => ({ statement: pre })),
this.adapter.publishTasks(t, this.runConfig, tableMetadata).build(),
(t.postOps || []).map(post => ({ statement: post }))
);
? ([] as dataform.IExecutionTask[])
: this.adapter.publishTasks(t, this.runConfig, tableMetadata).build();

return dataform.ExecutionAction.create({
name: t.name,
Expand Down
16 changes: 5 additions & 11 deletions content/docs/how-to-guides/incremental-datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,22 @@ config { type: "incremental" }
SELECT timestamp, action
FROM weblogs.user_actions

incremental_where {
timestamp > (SELECT MAX(timestamp) FROM ${self()})
}
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})` }
```
First the script sets the type of the dataset to `incremental`.
It then specifies a `WHERE` clause using an `incremental_where` block:
It then specifies a `WHERE` clause using the `when()` and `incremental()` functions:
```js
incremental_where {
timestamp > (SELECT MAX(timestamp) FROM ${self()})
}
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
```
This ensures that only rows from the source dataset with a <b>timestamp greater than the latest timestamp that has been processed so far</b> are selected in the incremental query.
Note that `self()` is used here in order to get the name of the current dataset. Thus the compiled `WHERE` clause will be expanded to:
```js
```sql
timestamp > (SELECT MAX(timestamp) FROM default_schema.example_incremental)
```
Expand Down Expand Up @@ -147,9 +143,7 @@ config { type: "incremental" }

SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM productiondb.customers

incremental_where {
snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})
}
${ when(incremental(), `WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`) }
```
- By selecting the current date as `snapshot_date`, this effectively appends a dated snapshot of the `productiondb.customers` dataset to the output dataset each day.
Expand Down
24 changes: 12 additions & 12 deletions content/docs/how-to-guides/sqlx.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ SQLX contains the following components:

### Config

All config properties, and the config itself, are optional. See the API reference for exact options.
All config properties, and the config itself, are optional. See [`ITableConfig` in the API reference](/reference#ITableConfig) for exact options.

### SQL

Expand Down Expand Up @@ -52,32 +52,32 @@ In-line Javascript can be used anywhere SQL is written in order to dynamically m

### Built-in functions

Built in functions have special functionality should be executed within [in-line Javascript](#in-line-javascript).
Built in functions have special functionality and can be executed either within [in-line Javascript](#in-line-javascript) or [javascript blocks](#javascript-blocks).

For all built in functions, see [`ITableContext` in the API reference](/reference#ITableContext). Some useful examples can be found here:

#### `ref()`

`ref()` enables you to easily reference another dataset in your project without having to provide the full SQL dataset name. `ref()` also adds the referenced dataset to the set of dependencies for the query.

An example of `ref()` being used to add a dependency is [here](datasets/#referencing-other-datasets).
Some examples can be found [here](datasets/#referencing-other-datasets).

#### `resolve()`

`resolve()` works similarly to `ref()`, but doesn't add the dataset to the dependency list for the query.

#### `self()`

`self()` returns the name of the current dataset. If the default schema or dataset name is overridden in the `config{}` block, `self()` will return the full and correct dataset name.

An example of `self()` being used to set up incremental tables is [here](incremental-datasets/#a-simple-example).
`self()` returns the name of the current dataset. If the database, schema, or dataset name is overridden in the `config{}` block, `self()` will return the full and correct dataset name.

## Additional Features
[Here](incremental-datasets/#a-simple-example) is an example of an incremental table using the `self()` function.

- [Pre-operations and post-operations](#pre-operations-and-post-operations): Pre and post operations are only valid for some table types.
- [Retrieve the name of the current dataset with `self()`](incremental-datasets/#a-simple-example).

### Pre-operations and post-operations
- [Execute code only if the script is for an incremental dataset using `incremental()`](incremental-datasets/#conditional-code-if-incremental).

Pre-operation and post-operation blocks are defined in SQLX by writing `pre_operations { }` and `post_operations { }` respectively.
## Additional Features

SQL written in pre-operation blocks is executed before the central chunk of SQL, while post-operation blocks are executed afterwards.
- **Pre-operations**: defined in SQLX by writing `pre_operations { }`, SQL written inside will be executed before the main SQL. This can be useful for granting permissions, as can be seen in the [publishing datasets guide](/how-to-guides/datasets/#example-granting-dataset-access-with-post_operations). **Actions may only include pre_operations if they create a dataset**, for example with `type: "table"` or `type: "view"` or `type: "incremental"` in their config.

They are useful for purposes such as setting permissions. An example can be found in the [publishing datasets guide](datasets).
- **Post-operations**: the same as pre-operations, but defined with `post_operations { }`, and runs after the main SQL.
59 changes: 52 additions & 7 deletions core/adapters/base.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Task, Tasks } from "@dataform/core/tasks";
import { dataform } from "@dataform/protos";

export abstract class Adapter {
constructor(protected readonly dataformCoreVersion: string) {}

public abstract resolveTarget(target: dataform.ITarget): string;

public normalizeIdentifier(identifier: string) {
Expand All @@ -9,7 +12,7 @@ export abstract class Adapter {

public dropIfExists(target: dataform.ITarget, type: string) {
return `drop ${this.baseTableType(type)} if exists ${this.resolveTarget(target)} ${
this.baseTableType(type) == "table" ? "cascade" : ""
this.baseTableType(type) === "table" ? "cascade" : ""
}`;
}

Expand All @@ -22,18 +25,60 @@ export abstract class Adapter {

protected insertInto(target: dataform.ITarget, columns: string[], query: string) {
return `
insert into ${this.resolveTarget(target)}
(${columns.join(",")})
select ${columns.join(",")}
from (${query}) as insertions`;
insert into ${this.resolveTarget(target)}
(${columns.join(",")})
select ${columns.join(",")}
from (${query}) as insertions`;
}

protected oppositeTableType(type: string) {
return this.baseTableType(type) === "table" ? "view" : "table";
}

protected where(query: string, where: string) {
return `select * from (${query}) as subquery
where ${where || "true"}`;
return where
? `
select * from (${query}) as subquery
where ${where}`
: query;
}

protected shouldWriteIncrementally(
runConfig: dataform.IRunConfig,
tableMetadata?: dataform.ITableMetadata
) {
return !runConfig.fullRefresh && tableMetadata && tableMetadata.type !== "view";
}

protected preOps(
table: dataform.ITable,
runConfig: dataform.IRunConfig,
tableMetadata: dataform.ITableMetadata
): Task[] {
let preOps = table.preOps;
if (
this.dataformCoreVersion > "1.4.8" &&
table.type === "incremental" &&
this.shouldWriteIncrementally(runConfig, tableMetadata)
) {
preOps = table.incrementalPreOps;
}
return (preOps || []).map(pre => Task.statement(pre));
}

protected postOps(
table: dataform.ITable,
runConfig: dataform.IRunConfig,
tableMetadata: dataform.ITableMetadata
): Task[] {
let postOps = table.postOps;
if (
this.dataformCoreVersion > "1.4.8" &&
table.type === "incremental" &&
this.shouldWriteIncrementally(runConfig, tableMetadata)
) {
postOps = table.incrementalPreOps;
}
return (postOps || []).map(post => Task.statement(post));
}
}
28 changes: 12 additions & 16 deletions core/adapters/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { Task, Tasks } from "@dataform/core/tasks";
import { dataform } from "@dataform/protos";

export class BigQueryAdapter extends Adapter implements IAdapter {
constructor(private project: dataform.IProjectConfig, private dataformCoreVersion: string) {
super();
constructor(private readonly project: dataform.IProjectConfig, dataformCoreVersion: string) {
super(dataformCoreVersion);
}

public resolveTarget(target: dataform.ITarget) {
Expand All @@ -19,14 +19,17 @@ export class BigQueryAdapter extends Adapter implements IAdapter {
tableMetadata: dataform.ITableMetadata
): Tasks {
const tasks = Tasks.create();
// Drop views/tables first if they exist.
if (tableMetadata && tableMetadata.type != this.baseTableType(table.type)) {

this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

if (tableMetadata && tableMetadata.type !== this.baseTableType(table.type)) {
tasks.add(
Task.statement(this.dropIfExists(table.target, this.oppositeTableType(table.type)))
);
}
if (table.type == "incremental") {
if (runConfig.fullRefresh || !tableMetadata || tableMetadata.type == "view") {

if (table.type === "incremental") {
if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) {
tasks.add(Task.statement(this.createOrReplace(table)));
} else {
tasks.add(
Expand All @@ -42,6 +45,9 @@ export class BigQueryAdapter extends Adapter implements IAdapter {
} else {
tasks.add(Task.statement(this.createOrReplace(table)));
}

this.postOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

return tasks;
}

Expand All @@ -61,16 +67,6 @@ export class BigQueryAdapter extends Adapter implements IAdapter {
return tasks;
}

public createEmptyIfNotExists(table: dataform.ITable) {
return `create ${this.baseTableType(table.type)} if not exists ${this.resolveTarget(
table.target
)} ${
table.bigquery && table.bigquery.partitionBy
? `partition by ${table.bigquery.partitionBy}`
: ""
} as ${this.where(table.query, "false")}`;
}

public createOrReplace(table: dataform.ITable) {
return `create or replace ${this.baseTableType(table.type)} ${this.resolveTarget(
table.target
Expand Down
15 changes: 10 additions & 5 deletions core/adapters/redshift.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { dataform } from "@dataform/protos";
import * as semver from "semver";

export class RedshiftAdapter extends Adapter implements IAdapter {
constructor(private project: dataform.IProjectConfig, private dataformCoreVersion: string) {
super();
constructor(private readonly project: dataform.IProjectConfig, dataformCoreVersion: string) {
super(dataformCoreVersion);
}

public resolveTarget(target: dataform.ITarget) {
Expand All @@ -19,17 +19,19 @@ export class RedshiftAdapter extends Adapter implements IAdapter {
tableMetadata: dataform.ITableMetadata
): Tasks {
const tasks = Tasks.create();
// Drop the existing view or table if we are changing it's type.

this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

if (tableMetadata && tableMetadata.type !== this.baseTableType(table.type)) {
tasks.add(
Task.statement(this.dropIfExists(table.target, this.oppositeTableType(table.type)))
);
}

if (table.type === "incremental") {
if (runConfig.fullRefresh || !tableMetadata || tableMetadata.type === "view") {
if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) {
tasks.addAll(this.createOrReplace(table));
} else {
// The table exists, insert new rows.
tasks.add(
Task.statement(
this.insertInto(
Expand All @@ -43,6 +45,9 @@ export class RedshiftAdapter extends Adapter implements IAdapter {
} else {
tasks.addAll(this.createOrReplace(table));
}

this.postOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

return tasks;
}

Expand Down
19 changes: 12 additions & 7 deletions core/adapters/snowflake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { Task, Tasks } from "@dataform/core/tasks";
import { dataform } from "@dataform/protos";

export class SnowflakeAdapter extends Adapter implements IAdapter {
constructor(private project: dataform.IProjectConfig, private dataformCoreVersion: string) {
super();
constructor(private readonly project: dataform.IProjectConfig, dataformCoreVersion: string) {
super(dataformCoreVersion);
}

public resolveTarget(target: dataform.ITarget) {
Expand All @@ -22,17 +22,19 @@ export class SnowflakeAdapter extends Adapter implements IAdapter {
tableMetadata: dataform.ITableMetadata
): Tasks {
const tasks = Tasks.create();
// Drop the existing view or table if we are changing it's type.
if (tableMetadata && tableMetadata.type != this.baseTableType(table.type)) {

this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

if (tableMetadata && tableMetadata.type !== this.baseTableType(table.type)) {
tasks.add(
Task.statement(this.dropIfExists(table.target, this.oppositeTableType(table.type)))
);
}
if (table.type == "incremental") {
if (runConfig.fullRefresh || !tableMetadata || tableMetadata.type == "view") {

if (table.type === "incremental") {
if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) {
tasks.add(Task.statement(this.createOrReplace(table)));
} else {
// The table exists, insert new rows.
tasks.add(
Task.statement(
this.insertInto(
Expand All @@ -46,6 +48,9 @@ export class SnowflakeAdapter extends Adapter implements IAdapter {
} else {
tasks.add(Task.statement(this.createOrReplace(table)));
}

this.postOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

return tasks;
}

Expand Down
Loading

0 comments on commit 058b07a

Please sign in to comment.