Skip to content

Commit

Permalink
Action retries (#487)
Browse files Browse the repository at this point in the history
* Update error message when a task errors because of a warehouse

* adds retry promise func and unit test

* Refactors the retry promise function and adds tests

* Fixes comments and refactors

* Adds test for operations case, minor refactoring

* Upgrade package
  • Loading branch information
George McGowan authored and probot-auto-merge[bot] committed Nov 21, 2019
1 parent 05856fb commit 988479e
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 11 deletions.
16 changes: 11 additions & 5 deletions api/commands/run.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Credentials } from "@dataform/api/commands/credentials";
import * as dbadapters from "@dataform/api/dbadapters";
import { retry } from "@dataform/api/utils/retry";
import { dataform } from "@dataform/protos";
import * as EventEmitter from "events";
import * as Long from "long";
Expand Down Expand Up @@ -247,10 +248,15 @@ export class Runner {
parentAction.tasks.push(taskResult);
await this.triggerChange();
try {
const rows = await this.adapter.execute(task.statement, {
onCancel: handleCancel => this.eEmitter.on(CANCEL_EVENT, handleCancel),
maxResults: 1
});
// Retry this function a given number of times, configurable by user
const rows = await retry(
() =>
this.adapter.execute(task.statement, {
onCancel: handleCancel => this.eEmitter.on(CANCEL_EVENT, handleCancel),
maxResults: 1
}),
task.type === "operation" ? 0 : this.graph.projectConfig.idempotentActionRetries || 0
);
if (task.type === "assertion") {
// We expect that an assertion query returns 1 row, with 1 field that is the row count.
// We don't really care what that field/column is called.
Expand All @@ -264,7 +270,7 @@ export class Runner {
taskResult.status = this.cancelled
? dataform.TaskResult.ExecutionStatus.CANCELLED
: dataform.TaskResult.ExecutionStatus.FAILED;
taskResult.errorMessage = e.message;
taskResult.errorMessage = `${this.graph.projectConfig.warehouse} error: ${e.message}`;
}
taskResult.timing = timer.end();
await this.triggerChange();
Expand Down
10 changes: 10 additions & 0 deletions api/utils/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export async function retry<T>(func: () => Promise<T>, retries: number): Promise<T> {
try {
return await func();
} catch (e) {
if (retries === 0) {
throw e;
}
return await retry(func, retries - 1);
}
}
1 change: 1 addition & 0 deletions protos/core.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ message ProjectConfig {
string warehouse = 1;
string default_schema = 2;
string assertion_schema = 5;
int32 idempotent_action_retries = 8;

string schema_suffix = 7;

Expand Down
175 changes: 174 additions & 1 deletion tests/api/api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ describe("@dataform/api", () => {
tasks: [
{
status: dataform.TaskResult.ExecutionStatus.FAILED,
errorMessage: "bad statement"
errorMessage: "bigquery error: bad statement"
}
],
status: dataform.ActionResult.ExecutionStatus.FAILED
Expand All @@ -723,6 +723,179 @@ describe("@dataform/api", () => {
);
});

context("execute with retry", () => {
it("should fail when execution fails too many times for the retry setting", async () => {
const mockedDbAdapter = mock(BigQueryDbAdapter);
const NEW_TEST_GRAPH = {
...TEST_GRAPH,
projectConfig: { ...TEST_GRAPH.projectConfig, idempotentActionRetries: 1 }
};
when(mockedDbAdapter.prepareSchema(anyString())).thenResolve(null);
when(
mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything())
).thenResolve([]);
when(mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[1].tasks[0].statement, anything()))
.thenReject(new Error("bad statement"))
.thenReject(new Error("bad statement"))
.thenResolve([]);

const runner = new Runner(instance(mockedDbAdapter), NEW_TEST_GRAPH);
const result = await runner.execute();

delete result.timing;
result.actions.forEach(actionResult => {
delete actionResult.timing;
actionResult.tasks.forEach(taskResult => {
delete taskResult.timing;
});
});

expect(dataform.RunResult.create(result)).to.deep.equal(
dataform.RunResult.create({
status: dataform.RunResult.ExecutionStatus.FAILED,
actions: [
{
name: NEW_TEST_GRAPH.actions[0].name,
tasks: [
{
status: dataform.TaskResult.ExecutionStatus.SUCCESSFUL
}
],
status: dataform.ActionResult.ExecutionStatus.SUCCESSFUL
},
{
name: TEST_GRAPH.actions[1].name,
tasks: [
{
status: dataform.TaskResult.ExecutionStatus.FAILED,
errorMessage: "bigquery error: bad statement"
}
],
status: dataform.ActionResult.ExecutionStatus.FAILED
}
]
})
);
});

it("should pass when execution fails initially, then passes with the number of allowed retries", async () => {
const mockedDbAdapter = mock(BigQueryDbAdapter);
const NEW_TEST_GRAPH = {
...TEST_GRAPH,
projectConfig: { ...TEST_GRAPH.projectConfig, idempotentActionRetries: 2 }
};
when(mockedDbAdapter.prepareSchema(anyString())).thenResolve(null);
when(
mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything())
).thenResolve([]);
when(mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[1].tasks[0].statement, anything()))
.thenReject(new Error("bad statement"))
.thenReject(new Error("bad statement"))
.thenResolve([]);

const runner = new Runner(instance(mockedDbAdapter), NEW_TEST_GRAPH);
const result = await runner.execute();

delete result.timing;
result.actions.forEach(actionResult => {
delete actionResult.timing;
actionResult.tasks.forEach(taskResult => {
delete taskResult.timing;
});
});

expect(dataform.RunResult.create(result)).to.deep.equal(
dataform.RunResult.create({
status: dataform.RunResult.ExecutionStatus.SUCCESSFUL,
actions: [
{
name: NEW_TEST_GRAPH.actions[0].name,
tasks: [
{
status: dataform.TaskResult.ExecutionStatus.SUCCESSFUL
}
],
status: dataform.ActionResult.ExecutionStatus.SUCCESSFUL
},
{
name: NEW_TEST_GRAPH.actions[1].name,
tasks: [
{
status: dataform.TaskResult.ExecutionStatus.SUCCESSFUL
}
],
status: dataform.ActionResult.ExecutionStatus.SUCCESSFUL
}
]
})
);
});

it("should not retry when the task is an operation", async () => {
const mockedDbAdapter = mock(BigQueryDbAdapter);
const NEW_TEST_GRAPH_WITH_OPERATION = {
...TEST_GRAPH,
projectConfig: { ...TEST_GRAPH.projectConfig, idempotentActionRetries: 3 }
};
NEW_TEST_GRAPH_WITH_OPERATION.actions[1].tasks[0].type = "operation";

when(mockedDbAdapter.prepareSchema(anyString())).thenResolve(null);
when(
mockedDbAdapter.execute(
NEW_TEST_GRAPH_WITH_OPERATION.actions[0].tasks[0].statement,
anything()
)
).thenResolve([]);
when(
mockedDbAdapter.execute(
NEW_TEST_GRAPH_WITH_OPERATION.actions[1].tasks[0].statement,
anything()
)
)
.thenReject(new Error("bad statement"))
.thenReject(new Error("bad statement"))
.thenResolve([]);

const runner = new Runner(instance(mockedDbAdapter), NEW_TEST_GRAPH_WITH_OPERATION);
const result = await runner.execute();

delete result.timing;
result.actions.forEach(actionResult => {
delete actionResult.timing;
actionResult.tasks.forEach(taskResult => {
delete taskResult.timing;
});
});

expect(dataform.RunResult.create(result)).to.deep.equal(
dataform.RunResult.create({
status: dataform.RunResult.ExecutionStatus.FAILED,
actions: [
{
name: NEW_TEST_GRAPH_WITH_OPERATION.actions[0].name,
tasks: [
{
status: dataform.TaskResult.ExecutionStatus.SUCCESSFUL
}
],
status: dataform.ActionResult.ExecutionStatus.SUCCESSFUL
},
{
name: NEW_TEST_GRAPH_WITH_OPERATION.actions[1].name,
tasks: [
{
status: dataform.TaskResult.ExecutionStatus.FAILED,
errorMessage: "bigquery error: bad statement"
}
],
status: dataform.ActionResult.ExecutionStatus.FAILED
}
]
})
);
});
});

it("execute_with_cancel", async () => {
const TEST_GRAPH: dataform.IExecutionGraph = dataform.ExecutionGraph.create({
projectConfig: {
Expand Down
56 changes: 56 additions & 0 deletions tests/api/utils/retry.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { retry } from "@dataform/api/utils/retry";
import { expect } from "chai";

describe("retry", () => {
it("doesn't retry if the function succeeds", async () => {
let calledTimes = 0;
const succeedingFunc = async () => {
calledTimes += 1;
return "success";
};
const result = await retry(succeedingFunc, 2);
expect(result).to.eq("success");
expect(calledTimes).eq(1);
});

it("doesn't retry if the function fails and retries is 0", async () => {
let calledTimes = 0;
const failingFunc = async () => {
calledTimes += 1;
throw new Error("an error");
};
try {
await retry(failingFunc, 0);
} catch (e) {
expect(e.toString()).to.eq("Error: an error");
}
expect(calledTimes).eq(1);
});

it("calls the function three times if the function fails and retries is 2", async () => {
let calledTimes = 0;
const failingFunc = async () => {
calledTimes += 1;
throw new Error("an error");
};
try {
await retry(failingFunc, 2);
} catch (e) {}
expect(calledTimes).eq(3);
});

it("will eventually return a success if the function has failed before", async () => {
let calledTimes = 0;
const failingFunc = async () => {
if (calledTimes > 1) {
return "success";
}
calledTimes += 1;
throw new Error("an error");
};
try {
await retry(failingFunc, 2);
} catch (e) {}
expect(calledTimes).eq(2);
});
});
2 changes: 1 addition & 1 deletion tests/integration/bigquery.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe("@dataform/integration/bigquery", () => {
expect(
actionMap["df_integration_test_assertions.example_assertion_uniqueness_fail"].tasks[1]
.errorMessage
).to.eql("Assertion failed: query returned 1 row(s).");
).to.eql("bigquery error: Assertion failed: query returned 1 row(s).");
expect(
actionMap["df_integration_test_assertions.example_assertion_uniqueness_pass"].status
).equals(dataform.ActionResult.ExecutionStatus.SUCCESSFUL);
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/redshift.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ describe("@dataform/integration/redshift", () => {
expect(
actionMap["df_integration_test_assertions.example_assertion_uniqueness_fail"].tasks[1]
.errorMessage
).to.eql("Assertion failed: query returned 1 row(s).");
).to.eql("redshift error: Assertion failed: query returned 1 row(s).");
expect(
actionMap["df_integration_test_assertions.example_assertion_uniqueness_pass"].status
).equals(dataform.ActionResult.ExecutionStatus.SUCCESSFUL);
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/snowflake.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ describe("@dataform/integration/snowflake", () => {
expect(
actionMap["DF_INTEGRATION_TEST_ASSERTIONS.EXAMPLE_ASSERTION_UNIQUENESS_FAIL"].tasks[1]
.errorMessage
).to.eql("Assertion failed: query returned 1 row(s).");
).to.eql("snowflake error: Assertion failed: query returned 1 row(s).");
expect(
actionMap["DF_INTEGRATION_TEST_ASSERTIONS.EXAMPLE_ASSERTION_UNIQUENESS_PASS"].status
).equals(dataform.ActionResult.ExecutionStatus.SUCCESSFUL);
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/sqldatawarehouse.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe("@dataform/integration/sqldatawarehouse", () => {
expect(
actionMap["df_integration_test_assertions.example_assertion_uniqueness_fail"].tasks[2]
.errorMessage
).to.eql("Assertion failed: query returned 1 row(s).");
).to.eql("sqldatawarehouse error: Assertion failed: query returned 1 row(s).");
expect(
actionMap["df_integration_test_assertions.example_assertion_uniqueness_pass"].status
).equals(dataform.ActionResult.ExecutionStatus.SUCCESSFUL);
Expand Down
2 changes: 1 addition & 1 deletion version.bzl
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# NOTE: If you change the format of this line, you must change the bash command
# in /scripts/publish to extract the version string correctly.
DF_VERSION = "1.3.9"
DF_VERSION = "1.4.0"

0 comments on commit 988479e

Please sign in to comment.