diff --git a/api/commands/run.ts b/api/commands/run.ts index ce3107d98..2db602e4a 100644 --- a/api/commands/run.ts +++ b/api/commands/run.ts @@ -1,5 +1,6 @@ import { Credentials } from "@dataform/api/commands/credentials"; import * as dbadapters from "@dataform/api/dbadapters"; +import { retry } from "@dataform/api/utils/retry"; import { dataform } from "@dataform/protos"; import * as EventEmitter from "events"; import * as Long from "long"; @@ -247,10 +248,15 @@ export class Runner { parentAction.tasks.push(taskResult); await this.triggerChange(); try { - const rows = await this.adapter.execute(task.statement, { - onCancel: handleCancel => this.eEmitter.on(CANCEL_EVENT, handleCancel), - maxResults: 1 - }); + // Retry this function a given number of times, configurable by user + const rows = await retry( + () => + this.adapter.execute(task.statement, { + onCancel: handleCancel => this.eEmitter.on(CANCEL_EVENT, handleCancel), + maxResults: 1 + }), + task.type === "operation" ? 0 : this.graph.projectConfig.idempotentActionRetries || 0 + ); if (task.type === "assertion") { // We expect that an assertion query returns 1 row, with 1 field that is the row count. // We don't really care what that field/column is called. @@ -264,7 +270,7 @@ export class Runner { taskResult.status = this.cancelled ? dataform.TaskResult.ExecutionStatus.CANCELLED : dataform.TaskResult.ExecutionStatus.FAILED; - taskResult.errorMessage = e.message; + taskResult.errorMessage = `${this.graph.projectConfig.warehouse} error: ${e.message}`; } taskResult.timing = timer.end(); await this.triggerChange(); diff --git a/api/utils/retry.ts b/api/utils/retry.ts new file mode 100644 index 000000000..30f39f973 --- /dev/null +++ b/api/utils/retry.ts @@ -0,0 +1,10 @@ +export async function retry(func: () => Promise, retries: number): Promise { + try { + return await func(); + } catch (e) { + if (retries === 0) { + throw e; + } + return await retry(func, retries - 1); + } +} diff --git a/protos/core.proto b/protos/core.proto index 7b7e3ab2e..48bf83644 100644 --- a/protos/core.proto +++ b/protos/core.proto @@ -10,6 +10,7 @@ message ProjectConfig { string warehouse = 1; string default_schema = 2; string assertion_schema = 5; + int32 idempotent_action_retries = 8; string schema_suffix = 7; diff --git a/tests/api/api.spec.ts b/tests/api/api.spec.ts index b806a3a2e..1639c24a9 100644 --- a/tests/api/api.spec.ts +++ b/tests/api/api.spec.ts @@ -713,7 +713,7 @@ describe("@dataform/api", () => { tasks: [ { status: dataform.TaskResult.ExecutionStatus.FAILED, - errorMessage: "bad statement" + errorMessage: "bigquery error: bad statement" } ], status: dataform.ActionResult.ExecutionStatus.FAILED @@ -723,6 +723,179 @@ describe("@dataform/api", () => { ); }); + context("execute with retry", () => { + it("should fail when execution fails too many times for the retry setting", async () => { + const mockedDbAdapter = mock(BigQueryDbAdapter); + const NEW_TEST_GRAPH = { + ...TEST_GRAPH, + projectConfig: { ...TEST_GRAPH.projectConfig, idempotentActionRetries: 1 } + }; + when(mockedDbAdapter.prepareSchema(anyString())).thenResolve(null); + when( + mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything()) + ).thenResolve([]); + when(mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[1].tasks[0].statement, anything())) + .thenReject(new Error("bad statement")) + .thenReject(new Error("bad statement")) + .thenResolve([]); + + const runner = new Runner(instance(mockedDbAdapter), NEW_TEST_GRAPH); + const result = await runner.execute(); + + delete result.timing; + result.actions.forEach(actionResult => { + delete actionResult.timing; + actionResult.tasks.forEach(taskResult => { + delete taskResult.timing; + }); + }); + + expect(dataform.RunResult.create(result)).to.deep.equal( + dataform.RunResult.create({ + status: dataform.RunResult.ExecutionStatus.FAILED, + actions: [ + { + name: NEW_TEST_GRAPH.actions[0].name, + tasks: [ + { + status: dataform.TaskResult.ExecutionStatus.SUCCESSFUL + } + ], + status: dataform.ActionResult.ExecutionStatus.SUCCESSFUL + }, + { + name: TEST_GRAPH.actions[1].name, + tasks: [ + { + status: dataform.TaskResult.ExecutionStatus.FAILED, + errorMessage: "bigquery error: bad statement" + } + ], + status: dataform.ActionResult.ExecutionStatus.FAILED + } + ] + }) + ); + }); + + it("should pass when execution fails initially, then passes with the number of allowed retries", async () => { + const mockedDbAdapter = mock(BigQueryDbAdapter); + const NEW_TEST_GRAPH = { + ...TEST_GRAPH, + projectConfig: { ...TEST_GRAPH.projectConfig, idempotentActionRetries: 2 } + }; + when(mockedDbAdapter.prepareSchema(anyString())).thenResolve(null); + when( + mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything()) + ).thenResolve([]); + when(mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[1].tasks[0].statement, anything())) + .thenReject(new Error("bad statement")) + .thenReject(new Error("bad statement")) + .thenResolve([]); + + const runner = new Runner(instance(mockedDbAdapter), NEW_TEST_GRAPH); + const result = await runner.execute(); + + delete result.timing; + result.actions.forEach(actionResult => { + delete actionResult.timing; + actionResult.tasks.forEach(taskResult => { + delete taskResult.timing; + }); + }); + + expect(dataform.RunResult.create(result)).to.deep.equal( + dataform.RunResult.create({ + status: dataform.RunResult.ExecutionStatus.SUCCESSFUL, + actions: [ + { + name: NEW_TEST_GRAPH.actions[0].name, + tasks: [ + { + status: dataform.TaskResult.ExecutionStatus.SUCCESSFUL + } + ], + status: dataform.ActionResult.ExecutionStatus.SUCCESSFUL + }, + { + name: NEW_TEST_GRAPH.actions[1].name, + tasks: [ + { + status: dataform.TaskResult.ExecutionStatus.SUCCESSFUL + } + ], + status: dataform.ActionResult.ExecutionStatus.SUCCESSFUL + } + ] + }) + ); + }); + + it("should not retry when the task is an operation", async () => { + const mockedDbAdapter = mock(BigQueryDbAdapter); + const NEW_TEST_GRAPH_WITH_OPERATION = { + ...TEST_GRAPH, + projectConfig: { ...TEST_GRAPH.projectConfig, idempotentActionRetries: 3 } + }; + NEW_TEST_GRAPH_WITH_OPERATION.actions[1].tasks[0].type = "operation"; + + when(mockedDbAdapter.prepareSchema(anyString())).thenResolve(null); + when( + mockedDbAdapter.execute( + NEW_TEST_GRAPH_WITH_OPERATION.actions[0].tasks[0].statement, + anything() + ) + ).thenResolve([]); + when( + mockedDbAdapter.execute( + NEW_TEST_GRAPH_WITH_OPERATION.actions[1].tasks[0].statement, + anything() + ) + ) + .thenReject(new Error("bad statement")) + .thenReject(new Error("bad statement")) + .thenResolve([]); + + const runner = new Runner(instance(mockedDbAdapter), NEW_TEST_GRAPH_WITH_OPERATION); + const result = await runner.execute(); + + delete result.timing; + result.actions.forEach(actionResult => { + delete actionResult.timing; + actionResult.tasks.forEach(taskResult => { + delete taskResult.timing; + }); + }); + + expect(dataform.RunResult.create(result)).to.deep.equal( + dataform.RunResult.create({ + status: dataform.RunResult.ExecutionStatus.FAILED, + actions: [ + { + name: NEW_TEST_GRAPH_WITH_OPERATION.actions[0].name, + tasks: [ + { + status: dataform.TaskResult.ExecutionStatus.SUCCESSFUL + } + ], + status: dataform.ActionResult.ExecutionStatus.SUCCESSFUL + }, + { + name: NEW_TEST_GRAPH_WITH_OPERATION.actions[1].name, + tasks: [ + { + status: dataform.TaskResult.ExecutionStatus.FAILED, + errorMessage: "bigquery error: bad statement" + } + ], + status: dataform.ActionResult.ExecutionStatus.FAILED + } + ] + }) + ); + }); + }); + it("execute_with_cancel", async () => { const TEST_GRAPH: dataform.IExecutionGraph = dataform.ExecutionGraph.create({ projectConfig: { diff --git a/tests/api/utils/retry.spec.ts b/tests/api/utils/retry.spec.ts new file mode 100644 index 000000000..99891ac76 --- /dev/null +++ b/tests/api/utils/retry.spec.ts @@ -0,0 +1,56 @@ +import { retry } from "@dataform/api/utils/retry"; +import { expect } from "chai"; + +describe("retry", () => { + it("doesn't retry if the function succeeds", async () => { + let calledTimes = 0; + const succeedingFunc = async () => { + calledTimes += 1; + return "success"; + }; + const result = await retry(succeedingFunc, 2); + expect(result).to.eq("success"); + expect(calledTimes).eq(1); + }); + + it("doesn't retry if the function fails and retries is 0", async () => { + let calledTimes = 0; + const failingFunc = async () => { + calledTimes += 1; + throw new Error("an error"); + }; + try { + await retry(failingFunc, 0); + } catch (e) { + expect(e.toString()).to.eq("Error: an error"); + } + expect(calledTimes).eq(1); + }); + + it("calls the function three times if the function fails and retries is 2", async () => { + let calledTimes = 0; + const failingFunc = async () => { + calledTimes += 1; + throw new Error("an error"); + }; + try { + await retry(failingFunc, 2); + } catch (e) {} + expect(calledTimes).eq(3); + }); + + it("will eventually return a success if the function has failed before", async () => { + let calledTimes = 0; + const failingFunc = async () => { + if (calledTimes > 1) { + return "success"; + } + calledTimes += 1; + throw new Error("an error"); + }; + try { + await retry(failingFunc, 2); + } catch (e) {} + expect(calledTimes).eq(2); + }); +}); diff --git a/tests/integration/bigquery.spec.ts b/tests/integration/bigquery.spec.ts index a618b8b0b..b8f7b02f8 100644 --- a/tests/integration/bigquery.spec.ts +++ b/tests/integration/bigquery.spec.ts @@ -73,7 +73,7 @@ describe("@dataform/integration/bigquery", () => { expect( actionMap["df_integration_test_assertions.example_assertion_uniqueness_fail"].tasks[1] .errorMessage - ).to.eql("Assertion failed: query returned 1 row(s)."); + ).to.eql("bigquery error: Assertion failed: query returned 1 row(s)."); expect( actionMap["df_integration_test_assertions.example_assertion_uniqueness_pass"].status ).equals(dataform.ActionResult.ExecutionStatus.SUCCESSFUL); diff --git a/tests/integration/redshift.spec.ts b/tests/integration/redshift.spec.ts index d6fa900cb..371fbf0c9 100644 --- a/tests/integration/redshift.spec.ts +++ b/tests/integration/redshift.spec.ts @@ -94,7 +94,7 @@ describe("@dataform/integration/redshift", () => { expect( actionMap["df_integration_test_assertions.example_assertion_uniqueness_fail"].tasks[1] .errorMessage - ).to.eql("Assertion failed: query returned 1 row(s)."); + ).to.eql("redshift error: Assertion failed: query returned 1 row(s)."); expect( actionMap["df_integration_test_assertions.example_assertion_uniqueness_pass"].status ).equals(dataform.ActionResult.ExecutionStatus.SUCCESSFUL); diff --git a/tests/integration/snowflake.spec.ts b/tests/integration/snowflake.spec.ts index bbf0d927c..79dc7d9ad 100644 --- a/tests/integration/snowflake.spec.ts +++ b/tests/integration/snowflake.spec.ts @@ -86,7 +86,7 @@ describe("@dataform/integration/snowflake", () => { expect( actionMap["DF_INTEGRATION_TEST_ASSERTIONS.EXAMPLE_ASSERTION_UNIQUENESS_FAIL"].tasks[1] .errorMessage - ).to.eql("Assertion failed: query returned 1 row(s)."); + ).to.eql("snowflake error: Assertion failed: query returned 1 row(s)."); expect( actionMap["DF_INTEGRATION_TEST_ASSERTIONS.EXAMPLE_ASSERTION_UNIQUENESS_PASS"].status ).equals(dataform.ActionResult.ExecutionStatus.SUCCESSFUL); diff --git a/tests/integration/sqldatawarehouse.spec.ts b/tests/integration/sqldatawarehouse.spec.ts index 9a4ef5495..29716b1a9 100644 --- a/tests/integration/sqldatawarehouse.spec.ts +++ b/tests/integration/sqldatawarehouse.spec.ts @@ -76,7 +76,7 @@ describe("@dataform/integration/sqldatawarehouse", () => { expect( actionMap["df_integration_test_assertions.example_assertion_uniqueness_fail"].tasks[2] .errorMessage - ).to.eql("Assertion failed: query returned 1 row(s)."); + ).to.eql("sqldatawarehouse error: Assertion failed: query returned 1 row(s)."); expect( actionMap["df_integration_test_assertions.example_assertion_uniqueness_pass"].status ).equals(dataform.ActionResult.ExecutionStatus.SUCCESSFUL); diff --git a/version.bzl b/version.bzl index 521fff8f1..d984b6fc8 100644 --- a/version.bzl +++ b/version.bzl @@ -1,3 +1,3 @@ # NOTE: If you change the format of this line, you must change the bash command # in /scripts/publish to extract the version string correctly. -DF_VERSION = "1.3.9" +DF_VERSION = "1.4.0"