diff --git a/backend/.sqlx/query-0a6a89e6ab3037f02c3c4c84ee02138d5fded1e6360bb992046fe9711b5ea213.json b/backend/.sqlx/query-0a6a89e6ab3037f02c3c4c84ee02138d5fded1e6360bb992046fe9711b5ea213.json new file mode 100644 index 0000000000000..460ce2bf8dbd9 --- /dev/null +++ b/backend/.sqlx/query-0a6a89e6ab3037f02c3c4c84ee02138d5fded1e6360bb992046fe9711b5ea213.json @@ -0,0 +1,67 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n job_kind AS \"job_kind: JobKind\",\n script_hash AS \"script_hash: ScriptHash\",\n flow_status AS \"flow_status!: Json>\",\n raw_flow AS \"raw_flow: Json>\"\n FROM queue WHERE id = $1 AND workspace_id = $2 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "job_kind: JobKind", + "type_info": { + "Custom": { + "name": "job_kind", + "kind": { + "Enum": [ + "script", + "preview", + "flow", + "dependencies", + "flowpreview", + "script_hub", + "identity", + "flowdependencies", + "http", + "graphql", + "postgresql", + "noop", + "appdependencies", + "deploymentcallback", + "singlescriptflow", + "flowscript", + "flownode", + "appscript" + ] + } + } + } + }, + { + "ordinal": 1, + "name": "script_hash: ScriptHash", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "flow_status!: Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "raw_flow: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + false, + true, + true, + true + ] + }, + "hash": "0a6a89e6ab3037f02c3c4c84ee02138d5fded1e6360bb992046fe9711b5ea213" +} diff --git a/backend/.sqlx/query-10cd655d3d2916721b530c7faa7652fb8fae25383b58f6b9e8dc431b76947315.json b/backend/.sqlx/query-10cd655d3d2916721b530c7faa7652fb8fae25383b58f6b9e8dc431b76947315.json new file mode 100644 index 0000000000000..4d9c9a8e8ce60 --- /dev/null +++ b/backend/.sqlx/query-10cd655d3d2916721b530c7faa7652fb8fae25383b58f6b9e8dc431b76947315.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT value AS \"value!: Json>\"\n FROM flow_version WHERE id = $1 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "value!: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "10cd655d3d2916721b530c7faa7652fb8fae25383b58f6b9e8dc431b76947315" +} diff --git a/backend/.sqlx/query-2b2ee874dbd90beec26713d2effdc5d011d9f1091a13761642d064220add7b41.json b/backend/.sqlx/query-2b2ee874dbd90beec26713d2effdc5d011d9f1091a13761642d064220add7b41.json new file mode 100644 index 0000000000000..cc91b1795a2d9 --- /dev/null +++ b/backend/.sqlx/query-2b2ee874dbd90beec26713d2effdc5d011d9f1091a13761642d064220add7b41.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT flow.versions[array_upper(flow.versions, 1)] AS \"version!: i64\"\n FROM flow WHERE path = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "version!: i64", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "2b2ee874dbd90beec26713d2effdc5d011d9f1091a13761642d064220add7b41" +} diff --git a/backend/.sqlx/query-3df03ec2345c905f03450e1e3f0c3ce7f41a22e72c5180ef8cc910305e4d0fce.json b/backend/.sqlx/query-3df03ec2345c905f03450e1e3f0c3ce7f41a22e72c5180ef8cc910305e4d0fce.json new file mode 100644 index 0000000000000..3bfcb0a9f30e3 --- /dev/null +++ b/backend/.sqlx/query-3df03ec2345c905f03450e1e3f0c3ce7f41a22e72c5180ef8cc910305e4d0fce.json @@ -0,0 +1,73 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n script_path, script_hash AS \"script_hash: ScriptHash\",\n job_kind AS \"job_kind: JobKind\",\n flow_status AS \"flow_status: Json>\",\n raw_flow AS \"raw_flow: Json>\"\n FROM completed_job WHERE id = $1 and workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "script_path", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "script_hash: ScriptHash", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "job_kind: JobKind", + "type_info": { + "Custom": { + "name": "job_kind", + "kind": { + "Enum": [ + "script", + "preview", + "flow", + "dependencies", + "flowpreview", + "script_hub", + "identity", + "flowdependencies", + "http", + "graphql", + "postgresql", + "noop", + "appdependencies", + "deploymentcallback", + "singlescriptflow", + "flowscript", + "flownode", + "appscript" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "flow_status: Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 4, + "name": "raw_flow: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + false, + true, + true + ] + }, + "hash": "3df03ec2345c905f03450e1e3f0c3ce7f41a22e72c5180ef8cc910305e4d0fce" +} diff --git a/backend/.sqlx/query-700f987ec9b2a17c9b8304d598269f8fb58a432934163588af5ea84481c1c087.json b/backend/.sqlx/query-700f987ec9b2a17c9b8304d598269f8fb58a432934163588af5ea84481c1c087.json new file mode 100644 index 0000000000000..868c6a8756c6f --- /dev/null +++ b/backend/.sqlx/query-700f987ec9b2a17c9b8304d598269f8fb58a432934163588af5ea84481c1c087.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT hash FROM script WHERE path = $1 AND workspace_id = $2 AND\n deleted = false AND lock IS not NULL AND lock_error_logs IS NULL", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "hash", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "700f987ec9b2a17c9b8304d598269f8fb58a432934163588af5ea84481c1c087" +} diff --git a/backend/.sqlx/query-85085cdc08c262c4750566a0a0b9754017b890ea8c1161a5892b6c29e663ee0e.json b/backend/.sqlx/query-85085cdc08c262c4750566a0a0b9754017b890ea8c1161a5892b6c29e663ee0e.json new file mode 100644 index 0000000000000..7c86fa85a7177 --- /dev/null +++ b/backend/.sqlx/query-85085cdc08c262c4750566a0a0b9754017b890ea8c1161a5892b6c29e663ee0e.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT value AS \"value!: Json>\"\n FROM flow_version_lite WHERE id = $1 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "value!: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true + ] + }, + "hash": "85085cdc08c262c4750566a0a0b9754017b890ea8c1161a5892b6c29e663ee0e" +} diff --git a/backend/.sqlx/query-9c1a1d0feb79f750c7143fabb0cfa7dab8dd683cd294c27d0549bd8d78ab60a0.json b/backend/.sqlx/query-9c1a1d0feb79f750c7143fabb0cfa7dab8dd683cd294c27d0549bd8d78ab60a0.json new file mode 100644 index 0000000000000..fcce977ba492d --- /dev/null +++ b/backend/.sqlx/query-9c1a1d0feb79f750c7143fabb0cfa7dab8dd683cd294c27d0549bd8d78ab60a0.json @@ -0,0 +1,71 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT lock AS \"lock: String\", content AS \"code!: String\",\n language AS \"language: Option\", envs AS \"envs: Vec\", codebase AS \"codebase: String\" FROM script WHERE hash = $1 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "lock: String", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "code!: String", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "language: Option", + "type_info": { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "envs: Vec", + "type_info": "VarcharArray" + }, + { + "ordinal": 4, + "name": "codebase: String", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true, + false, + false, + true, + true + ] + }, + "hash": "9c1a1d0feb79f750c7143fabb0cfa7dab8dd683cd294c27d0549bd8d78ab60a0" +} diff --git a/backend/.sqlx/query-9ee2f67042c1bed1e7d13eb7d07e78991e5d7cf01fc7993531dbedf33dac2e0b.json b/backend/.sqlx/query-9ee2f67042c1bed1e7d13eb7d07e78991e5d7cf01fc7993531dbedf33dac2e0b.json new file mode 100644 index 0000000000000..8f88e98b80f3f --- /dev/null +++ b/backend/.sqlx/query-9ee2f67042c1bed1e7d13eb7d07e78991e5d7cf01fc7993531dbedf33dac2e0b.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT lock AS \"lock: String\", code AS \"code: String\", flow AS \"flow: Json>\" FROM flow_node WHERE id = $1 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "lock: String", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "code: String", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "flow: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "9ee2f67042c1bed1e7d13eb7d07e78991e5d7cf01fc7993531dbedf33dac2e0b" +} diff --git a/backend/.sqlx/query-bc8ac03254669951654cda4bcfa12491341e745aef5e0e5090c2f4e4a4dc54fb.json b/backend/.sqlx/query-bc8ac03254669951654cda4bcfa12491341e745aef5e0e5090c2f4e4a4dc54fb.json new file mode 100644 index 0000000000000..fc27de30d0ebe --- /dev/null +++ b/backend/.sqlx/query-bc8ac03254669951654cda4bcfa12491341e745aef5e0e5090c2f4e4a4dc54fb.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json>\" FROM job WHERE id = $1 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "raw_code", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "raw_lock", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "raw_flow: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "bc8ac03254669951654cda4bcfa12491341e745aef5e0e5090c2f4e4a4dc54fb" +} diff --git a/backend/Cargo.lock b/backend/Cargo.lock index a117b2d748f88..445350f51065a 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -10851,6 +10851,7 @@ dependencies = [ "crc", "cron", "croner", + "futures", "futures-core", "gethostname", "git-version", diff --git a/backend/src/main.rs b/backend/src/main.rs index a8c9829340fe5..2428c57fd4ce0 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -142,7 +142,7 @@ async fn cache_hub_scripts(file_path: Option) -> anyhow::Result<()> { for path in paths.values() { tracing::info!("Caching hub script at {path}"); - let res = get_hub_script_content_and_requirements(Some(path.to_string()), None).await?; + let res = get_hub_script_content_and_requirements(Some(path), None).await?; if res .language .as_ref() @@ -189,7 +189,7 @@ async fn cache_hub_scripts(file_path: Option) -> anyhow::Result<()> { if let Err(e) = windmill_worker::prebundle_bun_script( &res.content, - Some(lockfile), + Some(&lockfile), &path, &job_id, "admins", diff --git a/backend/tests/fixtures/hello.sql b/backend/tests/fixtures/hello.sql new file mode 100644 index 0000000000000..31490ce087f3a --- /dev/null +++ b/backend/tests/fixtures/hello.sql @@ -0,0 +1,54 @@ +INSERT INTO public.script(workspace_id, created_by, content, schema, summary, description, path, hash, language, lock) VALUES ( +'test-workspace', +'system', +' +export function main(world: string) { + const greet = `Hello ${world}!`; + console.log(greet) + return greet +} +', +'{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"world":{"default":"world","description":"","type":"string"}},"required":[],"type":"object"}', +'', +'', +'f/system/hello', 123412, 'deno', ''); + +INSERT INTO public.flow(workspace_id, summary, description, path, versions, schema, value, edited_by) VALUES ( +'test-workspace', +'', +'', +'f/system/hello_flow', +'{1443253234253453}', +'{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"world":{"default":"world","description":"","type":"string"}},"required":[],"type":"object"}', +'{"modules": [{"id": "a", "value": {"path": "f/system/hello", "type": "script", "input_transforms": {"world": {"expr": "flow_input.world", "type": "javascript"}}}}]}', +'system' +); + +INSERT INTO public.flow_version(id, workspace_id, path, schema, value, created_by) VALUES ( +1443253234253453, +'test-workspace', +'f/system/hello_flow', +'{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"world":{"default":"world","description":"","type":"string"}},"required":[],"type":"object"}', +'{"modules": [{"id": "a", "value": {"path": "f/system/failing_script", "type": "script", "input_transforms": {"fail": {"expr": "flow_input.fail", "type": "javascript"}}}}]}', +'system' +); + +INSERT INTO public.flow(workspace_id, summary, description, path, versions, schema, value, edited_by) VALUES ( +'test-workspace', +'', +'', +'f/system/hello_with_nodes_flow', +'{1443253234253454}', +'{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"world":{"default":"world","description":"","type":"string"}},"required":[],"type":"object"}', +E'{"modules":[{"id":"a","value":{"type":"forloopflow","modules":[{"id":"b","value":{"type":"rawscript","content":"export function main(world: string) {\\n const greet = `Hello ${world}!`;\\n console.log(greet)\\n return greet\\n}\\n","language":"deno","input_transforms":{"world":{"type":"javascript","expr":"flow_input.iter.value"}},"is_trigger":false}},{"id":"c","value":{"type":"rawscript","content":"export function main(hello: string) {\\n const dareyou = `Did you just say \\"${hello}\\"??!`;\\n console.log(dareyou)\\n return dareyou\\n}","language":"deno","input_transforms":{"hello":{"type":"javascript","value":"${results.b}","expr":"`${results.b}`"}},"is_trigger":false}}],"iterator":{"type":"javascript","expr":"[\'foo\', \'bar\', \'baz\']"},"skip_failures":true,"parallel":false}}],"same_worker":false}', +'system' +); + +INSERT INTO public.flow_version(id, workspace_id, path, schema, value, created_by) VALUES ( +1443253234253454, +'test-workspace', +'f/system/hello_with_nodes_flow', +'{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"world":{"default":"world","description":"","type":"string"}},"required":[],"type":"object"}', +E'{"modules":[{"id":"a","value":{"type":"forloopflow","modules":[{"id":"b","value":{"type":"rawscript","content":"export function main(world: string) {\\n const greet = `Hello ${world}!`;\\n console.log(greet)\\n return greet\\n}\\n","language":"deno","input_transforms":{"world":{"type":"javascript","expr":"flow_input.iter.value"}},"is_trigger":false}},{"id":"c","value":{"type":"rawscript","content":"export function main(hello: string) {\\n const dareyou = `Did you just say \\"${hello}\\"??!`;\\n console.log(dareyou)\\n return dareyou\\n}","language":"deno","input_transforms":{"hello":{"type":"javascript","value":"${results.b}","expr":"`${results.b}`"}},"is_trigger":false}}],"iterator":{"type":"javascript","expr":"[\'foo\', \'bar\', \'baz\']"},"skip_failures":true,"parallel":false}}],"same_worker":false}', +'system' +); diff --git a/backend/tests/fixtures/relative_bun.sql b/backend/tests/fixtures/relative_bun.sql index fdc2676661655..1b4ce1fec3bd9 100644 --- a/backend/tests/fixtures/relative_bun.sql +++ b/backend/tests/fixtures/relative_bun.sql @@ -9,7 +9,7 @@ export function main() { '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{},"required":[],"type":"object"}', '', '', -'f/system/same_folder_script', -28028598712388162, 'bun', ''); +'f/system/same_folder_script', 12340, 'bun', ''); INSERT INTO public.script(workspace_id, created_by, content, schema, summary, description, path, hash, language, lock) VALUES ( 'test-workspace', @@ -22,7 +22,7 @@ export function main() { '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{},"required":[],"type":"object"}', '', '', -'f/system_relative/different_folder_script', -28028598712388161, 'bun', ''); +'f/system_relative/different_folder_script', 12341, 'bun', ''); INSERT INTO public.script(workspace_id, created_by, content, schema, summary, description, path, hash, language, lock) VALUES ( @@ -41,4 +41,4 @@ export function main() { '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{},"required":[],"type":"object"}', '', '', -'f/system_relative/nested_script', -28028598712388160, 'bun', ''); \ No newline at end of file +'f/system_relative/nested_script', 12342, 'bun', ''); \ No newline at end of file diff --git a/backend/tests/fixtures/relative_deno.sql b/backend/tests/fixtures/relative_deno.sql index 4dff7151105ad..d038ff4af9774 100644 --- a/backend/tests/fixtures/relative_deno.sql +++ b/backend/tests/fixtures/relative_deno.sql @@ -9,7 +9,7 @@ export function main() { '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{},"required":[],"type":"object"}', '', '', -'f/system/same_folder_script', -28028598712388162, 'deno', ''); +'f/system/same_folder_script', 12343, 'deno', ''); INSERT INTO public.script(workspace_id, created_by, content, schema, summary, description, path, hash, language, lock) VALUES ( 'test-workspace', @@ -22,7 +22,7 @@ export function main() { '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{},"required":[],"type":"object"}', '', '', -'f/system_relative/different_folder_script', -28028598712388161, 'deno', ''); +'f/system_relative/different_folder_script', 12344, 'deno', ''); INSERT INTO public.script(workspace_id, created_by, content, schema, summary, description, path, hash, language, lock) VALUES ( @@ -41,4 +41,4 @@ export function main() { '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{},"required":[],"type":"object"}', '', '', -'f/system_relative/nested_script', -28028598712388160, 'deno', ''); \ No newline at end of file +'f/system_relative/nested_script', 12345, 'deno', ''); \ No newline at end of file diff --git a/backend/tests/fixtures/relative_python.sql b/backend/tests/fixtures/relative_python.sql index ea8e5798a1d2b..05e8453ccce7a 100644 --- a/backend/tests/fixtures/relative_python.sql +++ b/backend/tests/fixtures/relative_python.sql @@ -8,7 +8,7 @@ def main(): '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{},"required":[],"type":"object"}', '', '', -'f/system/same_folder_script', -28028598712388162, 'python3', ''); +'f/system/same_folder_script', 12346, 'python3', ''); INSERT INTO public.script(workspace_id, created_by, content, schema, summary, description, path, hash, language, lock) VALUES ( 'test-workspace', @@ -20,7 +20,7 @@ def main(): '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{},"required":[],"type":"object"}', '', '', -'f/system_relative/different_folder_script', -28028598712388161, 'python3', ''); +'f/system_relative/different_folder_script', 12347, 'python3', ''); INSERT INTO public.script(workspace_id, created_by, content, schema, summary, description, path, hash, language, lock) VALUES ( @@ -38,4 +38,4 @@ def main(): '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{},"required":[],"type":"object"}', '', '', -'f/system_relative/nested_script', -28028598712388160, 'python3', ''); \ No newline at end of file +'f/system_relative/nested_script', 12348, 'python3', ''); \ No newline at end of file diff --git a/backend/tests/fixtures/schedule.sql b/backend/tests/fixtures/schedule.sql index cfaa195c8b411..8d851369b02c5 100644 --- a/backend/tests/fixtures/schedule.sql +++ b/backend/tests/fixtures/schedule.sql @@ -13,7 +13,7 @@ export async function main(fail: boolean = true) { '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"fail":{"default":true,"description":"","type":"boolean"}},"required":[],"type":"object"}', '', '', -'f/system/failing_script', -28028598712388162, 'deno', ''); +'f/system/failing_script', 12349, 'deno', ''); INSERT INTO public.script(workspace_id, created_by, content, schema, summary, description, path, hash, language, lock) VALUES ( 'test-workspace', @@ -26,7 +26,7 @@ export async function main() { '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"path":{"default":null,"description":"","type":"string"},"schedule_path":{"default":null,"description":"","type":"string"},"error":{"default":null,"description":"","properties":{},"type":"object"}},"required":["path","schedule_path","error"],"type":"object"}', '', '', -'f/system/schedule_error_handler', -28028598712388161, 'deno', ''); +'f/system/schedule_error_handler', 123410, 'deno', ''); INSERT INTO public.script(workspace_id, created_by, content, schema, summary, description, path, hash, language, lock) VALUES ( 'test-workspace', @@ -39,21 +39,21 @@ export async function main() { '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"path":{"default":null,"description":"","type":"string"},"schedule_path":{"default":null,"description":"","type":"string"},"previous_job_error":{"default":null,"description":"","type":"string"},"result":{"default":null,"description":"","type":"string"}},"required":["path","schedule_path","previous_job_error","result"],"type":"object"}', '', '', -'f/system/schedule_recovery_handler', -28028598712388160, 'deno', ''); +'f/system/schedule_recovery_handler', 123411, 'deno', ''); INSERT INTO public.flow(workspace_id, summary, description, path, versions, schema, value, edited_by) VALUES ( 'test-workspace', '', '', 'f/system/failing_flow', -'{1}', +'{1443253234253452}', '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"fail":{"default":true,"description":"","type":"boolean","format":""}},"required":[],"type":"object"}', '{"modules": [{"id": "a", "value": {"path": "f/system/failing_script", "type": "script", "input_transforms": {"fail": {"expr": "flow_input.fail", "type": "javascript"}}}}]}', 'system' ); INSERT INTO public.flow_version(id, workspace_id, path, schema, value, created_by) VALUES ( -1, +1443253234253452, 'test-workspace', 'f/system/failing_flow', '{"$schema":"https://json-schema.org/draft/2020-12/schema","properties":{"fail":{"default":true,"description":"","type":"boolean","format":""}},"required":[],"type":"object"}', diff --git a/backend/tests/worker.rs b/backend/tests/worker.rs index c983f2b7f80e7..559f82ecc77b2 100644 --- a/backend/tests/worker.rs +++ b/backend/tests/worker.rs @@ -1,5 +1,6 @@ use serde::de::DeserializeOwned; -use std::str::FromStr; +use std::future::Future; +use std::{str::FromStr, sync::Arc}; use windmill_api_client::types::{NewScript, NewScriptLanguage}; #[cfg(feature = "enterprise")] @@ -11,6 +12,7 @@ use serde::Deserialize; use serde_json::json; use sqlx::{postgres::PgListener, types::Uuid, Pool, Postgres}; +use tokio::sync::RwLock; #[cfg(feature = "enterprise")] use tokio::time::{timeout, Duration}; @@ -29,6 +31,9 @@ use windmill_common::{ flows::{FlowModule, FlowModuleValue, FlowValue, InputTransform}, jobs::{JobKind, JobPayload, RawCode}, scripts::{ScriptHash, ScriptLang}, + worker::{ + MIN_VERSION_IS_AT_LEAST_1_427, MIN_VERSION_IS_AT_LEAST_1_432, MIN_VERSION_IS_AT_LEAST_1_440, + }, }; use windmill_queue::PushIsolationLevel; @@ -3732,3 +3737,472 @@ async fn test_result_format(db: Pool) { .unwrap(); assert_eq!(result.get(), correct_result); } + +async fn test_for_versions>( + version_flags: impl Iterator>>, + test: impl Fn() -> F, +) { + for version_flag in version_flags { + *version_flag.write().await = true; + test().await; + } +} + +mod job_payload { + use super::*; + + use lazy_static::lazy_static; + + use windmill_common::cache; + use windmill_common::flows::FlowNodeId; + + lazy_static! { + static ref VERSION_FLAGS: [Arc>; 3] = [ + MIN_VERSION_IS_AT_LEAST_1_427.clone(), + MIN_VERSION_IS_AT_LEAST_1_432.clone(), + MIN_VERSION_IS_AT_LEAST_1_440.clone(), + ]; + } + + #[sqlx::test(fixtures("base", "hello"))] + async fn test_script_hash_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + let test = || async { + let result = RunJob::from(JobPayload::ScriptHash { + hash: ScriptHash(123412), + path: "f/system/hello".to_string(), + custom_concurrency_key: None, + concurrent_limit: None, + concurrency_time_window_s: None, + cache_ttl: None, + dedicated_worker: None, + language: ScriptLang::Deno, + priority: None, + apply_preprocessor: false, + }) + .arg("world", json!("foo")) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!(result, json!("Hello foo!")); + }; + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } + + #[sqlx::test(fixtures("base", "hello"))] + async fn test_flow_script_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + // Deploy the flow to produce the "lite" version. + let _ = RunJob::from(JobPayload::FlowDependencies { + path: "f/system/hello_with_nodes_flow".to_string(), + dedicated_worker: None, + version: 1443253234253454, + }) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + let flow_data = cache::flow::fetch_version_lite(&db, 1443253234253454) + .await + .unwrap(); + let flow_value = flow_data.value().unwrap(); + let flow_scripts = { + async fn load(db: &Pool, modules: &[FlowModule]) -> Vec { + let mut res = vec![]; + for module in modules { + let value = + serde_json::from_str::(module.value.get()).unwrap(); + match value { + FlowModuleValue::FlowScript { id, .. } => res.push(id), + FlowModuleValue::ForloopFlow { modules_node: Some(flow_node), .. } => { + let flow_data = cache::flow::fetch_flow(db, flow_node).await.unwrap(); + res.extend( + Box::pin(load(db, &flow_data.value().unwrap().modules)).await, + ); + } + _ => {} + } + } + res + } + + load(&db, &flow_value.modules).await + }; + assert_eq!(flow_scripts.len(), 2); + + let test = || async { + let result = RunJob::from(JobPayload::FlowScript { + id: flow_scripts[0], + language: ScriptLang::Deno, + custom_concurrency_key: None, + concurrent_limit: None, + concurrency_time_window_s: None, + cache_ttl: None, + dedicated_worker: None, + }) + .arg("world", json!("foo")) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!(result, json!("Hello foo!")); + }; + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + let test = || async { + let result = RunJob::from(JobPayload::FlowScript { + id: flow_scripts[1], + language: ScriptLang::Deno, + custom_concurrency_key: None, + concurrent_limit: None, + concurrency_time_window_s: None, + cache_ttl: None, + dedicated_worker: None, + }) + .arg("hello", json!("You know nothing Jean Neige")) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!( + result, + json!("Did you just say \"You know nothing Jean Neige\"??!") + ); + }; + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } + + #[sqlx::test(fixtures("base", "hello"))] + async fn test_flow_node_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + // Deploy the flow to produce the "lite" version. + let _ = RunJob::from(JobPayload::FlowDependencies { + path: "f/system/hello_with_nodes_flow".to_string(), + dedicated_worker: None, + version: 1443253234253454, + }) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + let flow_data = cache::flow::fetch_version_lite(&db, 1443253234253454) + .await + .unwrap(); + let flow_value = flow_data.value().unwrap(); + let forloop_module = + serde_json::from_str::(flow_value.modules[0].value.get()).unwrap(); + let FlowModuleValue::ForloopFlow { modules_node: Some(id), .. } = forloop_module else { + panic!("Expected a forloop module with a flow node"); + }; + + let test = || async { + let result = RunJob::from(JobPayload::FlowNode { + id, + path: "f/system/hello_with_nodes_flow/forloop-0".into(), + }) + .arg("iter", json!({ "value": "tests", "index": 0 })) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!(result, json!("Did you just say \"Hello tests!\"??!")); + }; + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } + + #[sqlx::test(fixtures("base", "hello"))] + async fn test_dependencies_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + let test = || async { + let result = RunJob::from(JobPayload::Dependencies { + path: "f/system/hello".to_string(), + hash: ScriptHash(123412), + language: ScriptLang::Deno, + dedicated_worker: None, + }) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!( + result.get("status").unwrap(), + &json!("Successful lock file generation") + ); + }; + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } + + // Just test that deploying a flow work as expected. + #[sqlx::test(fixtures("base", "hello"))] + async fn test_flow_dependencies_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + let test = || async { + let result = RunJob::from(JobPayload::FlowDependencies { + path: "f/system/hello_with_nodes_flow".to_string(), + dedicated_worker: None, + version: 1443253234253454, + }) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!( + result.get("status").unwrap(), + &json!("Successful lock file generation") + ); + }; + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } + + #[sqlx::test(fixtures("base", "hello"))] + async fn test_raw_flow_dependencies_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + let test = || async { + let result = RunJob::from(JobPayload::RawFlowDependencies { + path: "none".to_string(), + flow_value: serde_json::from_value(json!({ + "modules": [{ + "id": "a", + "value": { + "type": "rawscript", + "content": r#"export function main(world: string) { + const greet = `Hello ${world}!`; + console.log(greet) + return greet + }"#, + "language": "deno", + "input_transforms": { + "world": { "type": "javascript", "expr": "flow_input.world" } + } + } + }], + "schema": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "properties": { "world": { "type": "string" } }, + "type": "object", + "order": [ "world" ] + } + })) + .unwrap(), + }) + .arg("skip_flow_update", json!(true)) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + let result = RunJob::from(JobPayload::RawFlow { + value: serde_json::from_value::( + result.get("updated_flow_value").unwrap().clone(), + ) + .unwrap(), + path: None, + restarted_from: None, + }) + .arg("world", json!("Jean Neige")) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!(result, json!("Hello Jean Neige!")); + }; + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } + + #[sqlx::test(fixtures("base", "hello"))] + async fn test_raw_script_dependencies_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + let test = || async { + let result = RunJob::from(JobPayload::RawScriptDependencies { + script_path: "none".into(), + content: r#"export function main(world: string) { + const greet = `Hello ${world}!`; + console.log(greet) + return greet + }"# + .into(), + language: ScriptLang::Deno, + }) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!( + result, + json!({ "lock": "", "status": "Successful lock file generation" }) + ); + }; + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } + + #[sqlx::test(fixtures("base", "hello"))] + async fn test_flow_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + let test = || async { + let result = RunJob::from(JobPayload::Flow { + path: "f/system/hello_with_nodes_flow".to_string(), + dedicated_worker: None, + apply_preprocessor: true, + }) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!( + result, + json!([ + "Did you just say \"Hello foo!\"??!", + "Did you just say \"Hello bar!\"??!", + "Did you just say \"Hello baz!\"??!", + ]) + ); + }; + // Test the not "lite" flow. + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + // Deploy the flow to produce the "lite" version. + let _ = RunJob::from(JobPayload::FlowDependencies { + path: "f/system/hello_with_nodes_flow".to_string(), + dedicated_worker: None, + version: 1443253234253454, + }) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + // Test the "lite" flow. + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } + + #[sqlx::test(fixtures("base", "hello"))] + async fn test_restarted_flow_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + let test = || async { + let completed_job_id = RunJob::from(JobPayload::Flow { + path: "f/system/hello_with_nodes_flow".to_string(), + dedicated_worker: None, + apply_preprocessor: true, + }) + .run_until_complete(&db, port) + .await + .id; + + let result = RunJob::from(JobPayload::RestartedFlow { + completed_job_id, + step_id: "a".into(), + branch_or_iteration_n: None, + }) + .arg("iter", json!({ "value": "tests", "index": 0 })) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!( + result, + json!([ + "Did you just say \"Hello foo!\"??!", + "Did you just say \"Hello bar!\"??!", + "Did you just say \"Hello baz!\"??!", + ]) + ); + }; + // Test the not "lite" flow. + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + // Deploy the flow to produce the "lite" version. + let _ = RunJob::from(JobPayload::FlowDependencies { + path: "f/system/hello_with_nodes_flow".to_string(), + dedicated_worker: None, + version: 1443253234253454, + }) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + // Test the "lite" flow. + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } + + #[sqlx::test(fixtures("base", "hello"))] + async fn test_raw_flow_payload(db: Pool) { + initialize_tracing().await; + let server = ApiServer::start(db.clone()).await; + let port = server.addr.port(); + + let test = || async { + let result = RunJob::from(JobPayload::RawFlow { + value: serde_json::from_value(json!({ + "modules": [{ + "id": "a", + "value": { + "type": "rawscript", + "content": r#"export function main(world: string) { + const greet = `Hello ${world}!`; + console.log(greet) + return greet + }"#, + "language": "deno", + "input_transforms": { + "world": { "type": "javascript", "expr": "flow_input.world" } + } + } + }], + "schema": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "properties": { "world": { "type": "string" } }, + "type": "object", + "order": [ "world" ] + } + })) + .unwrap(), + path: None, + restarted_from: None, + }) + .arg("world", json!("Jean Neige")) + .run_until_complete(&db, port) + .await + .json_result() + .unwrap(); + + assert_eq!(result, json!("Hello Jean Neige!")); + }; + test_for_versions(VERSION_FLAGS.iter().cloned(), test).await; + } +} diff --git a/backend/windmill-api/src/apps.rs b/backend/windmill-api/src/apps.rs index 54391fe6cbe69..f9228a669312c 100644 --- a/backend/windmill-api/src/apps.rs +++ b/backend/windmill-api/src/apps.rs @@ -1323,8 +1323,8 @@ async fn execute_component( let policy = if let Some(id) = payload.version { let cache = cache::anon!({ u64 => Arc } in "policy" <= 1000); arc_policy = policy_fut - .map_ok(Arc::new) - .cached(cache, &(id as u64)) + .map_ok(sqlx::types::Json) // cache as json. + .cached(cache, &(id as u64), |sqlx::types::Json(x)| Arc::new(x)) .await?; &*arc_policy } else { @@ -1352,8 +1352,8 @@ async fn execute_component( ) .fetch_one(&db) .map_err(Into::::into) - .map_ok(Arc::new) - .cached(cache, &(*id as u64)) + .map_ok(sqlx::types::Json) // cache as json. + .cached(cache, &(*id as u64), |sqlx::types::Json(x)| Arc::new(x)) .await? } _ => unreachable!(), diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index ba63f71bcb8bb..4db486dcc5388 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -8,6 +8,7 @@ use axum::body::Body; use axum::http::HeaderValue; +use futures::TryFutureExt; use itertools::Itertools; use quick_cache::sync::Cache; use serde_json::value::RawValue; @@ -62,6 +63,7 @@ use windmill_audit::audit_ee::{audit_log, AuditAuthor}; use windmill_audit::ActionKind; use windmill_common::worker::{to_raw_value, CUSTOM_TAGS_PER_WORKSPACE}; use windmill_common::{ + cache, db::UserDB, error::{self, to_anyhow, Error}, flow_status::{Approval, FlowStatus, FlowStatusModule}, @@ -776,7 +778,7 @@ impl<'a> GetQuery<'a> { Self { with_in_tags: in_tags, ..self } } - fn check_auth(self, email: Option<&str>) -> error::Result<()> { + fn check_auth(&self, email: Option<&str>) -> error::Result<()> { if let Some(email) = email { if self.with_auth.is_some_and(|x| x.is_none()) && email != "anonymous" { return Err(Error::BadRequest( @@ -788,6 +790,53 @@ impl<'a> GetQuery<'a> { Ok(()) } + /// Resolve job raw values. + /// This fetch the raw values from the cache and update the job accordingly. + /// + /// # Details + /// Most of the raw values (code, lock and flow) had been removed from the `job`, `queue` and + /// `completed_job` tables. Only remains ones for "preview" jobs (i.e. [`JobKind::Preview`], + /// [`JobKind::FlowPreview`] and [`JobKind::Dependencies`]). [`JobKind::Flow`] as well but only + /// when pushed from an un-updated workers. + /// This function is used to make the above change transparent for the API, as the returned jobs + /// will have the raw values as if they were still in the tables. + async fn resolve_raw_values( + &self, + db: &DB, + id: Uuid, + kind: JobKind, + hash: Option, + job: &mut JobExtended, + ) { + let (raw_code, raw_lock, raw_flow) = ( + job.raw_code.take(), + job.raw_lock.take(), + job.raw_flow.take(), + ); + if self.with_flow { + // Try to fetch the flow from the cache, fallback to the preview flow. + // NOTE: This could check for the job kinds instead of the `or_else` but it's not + // necessary as `fetch_flow` return early if the job kind is not a preview one. + cache::job::fetch_flow(db, kind, hash) + .or_else(|_| cache::job::fetch_preview_flow(db, &id, raw_flow)) + .await + .ok() + .inspect(|data| job.raw_flow = Some(sqlx::types::Json(data.raw_flow.clone()))); + } + if self.with_code { + // Try to fetch the code from the cache, fallback to the preview code. + // NOTE: This could check for the job kinds instead of the `or_else` but it's not + // necessary as `fetch_script` return early if the job kind is not a preview one. + cache::job::fetch_script(db, kind, hash) + .or_else(|_| cache::job::fetch_preview_script(db, &id, raw_lock, raw_code)) + .await + .ok() + .inspect(|data| { + (job.raw_lock, job.raw_code) = (data.lock.clone(), Some(data.code.clone())) + }); + } + } + async fn fetch_queued( self, db: &DB, @@ -807,6 +856,10 @@ impl<'a> GetQuery<'a> { let mut job = query.fetch_optional(db).await?; self.check_auth(job.as_ref().map(|job| job.created_by.as_str()))?; + if let Some(job) = job.as_mut() { + self.resolve_raw_values(db, job.id, job.job_kind, job.script_hash, job) + .await; + } if self.with_flow { job = resolve_maybe_value(db, workspace_id, self.with_code, job, |job| { job.raw_flow.as_mut() @@ -835,6 +888,10 @@ impl<'a> GetQuery<'a> { let mut cjob = query.fetch_optional(db).await?; self.check_auth(cjob.as_ref().map(|job| job.created_by.as_str()))?; + if let Some(job) = cjob.as_mut() { + self.resolve_raw_values(db, job.id, job.job_kind, job.script_hash, job) + .await; + } if self.with_flow { cjob = resolve_maybe_value(db, workspace_id, self.with_code, cjob, |job| { job.raw_flow.as_mut() diff --git a/backend/windmill-common/Cargo.toml b/backend/windmill-common/Cargo.toml index c000eef084678..9c35ef7ea1db8 100644 --- a/backend/windmill-common/Cargo.toml +++ b/backend/windmill-common/Cargo.toml @@ -64,6 +64,7 @@ semver.workspace = true croner = "2.0.6" quick_cache.workspace = true pin-project-lite.workspace = true +futures.workspace = true opentelemetry-semantic-conventions = { workspace = true, optional = true } opentelemetry-otlp = { workspace = true, optional = true } diff --git a/backend/windmill-common/src/apps.rs b/backend/windmill-common/src/apps.rs index 2948fd7cb70f3..bc8429c2795db 100644 --- a/backend/windmill-common/src/apps.rs +++ b/backend/windmill-common/src/apps.rs @@ -13,12 +13,6 @@ use serde::{Deserialize, Serialize}; #[serde(transparent)] pub struct AppScriptId(pub i64); -impl Into for AppScriptId { - fn into(self) -> u64 { - self.0 as u64 - } -} - #[derive(Deserialize)] pub struct ListAppQuery { pub starred_only: Option, diff --git a/backend/windmill-common/src/cache.rs b/backend/windmill-common/src/cache.rs index 45fdd0bd8225e..9f3ec024ae247 100644 --- a/backend/windmill-common/src/cache.rs +++ b/backend/windmill-common/src/cache.rs @@ -1,11 +1,19 @@ +use crate::apps::AppScriptId; use crate::error; +use crate::flows::FlowNodeId; +use crate::flows::FlowValue; +use crate::scripts::ScriptHash; +use crate::scripts::ScriptLang; use std::future::Future; use std::hash::Hash; use std::path::{Path, PathBuf}; +use std::sync::Arc; +use futures::future::TryFutureExt; use quick_cache::Equivalent; use serde::{Deserialize, Serialize}; +use sqlx::types::{Json, JsonRawValue as RawValue}; use sqlx::PgExecutor; pub use const_format::concatcp; @@ -21,24 +29,42 @@ pub struct FsBackedCache { root: &'static str, } -impl FsBackedCache { +impl FsBackedCache { /// Create a new file-system backed cache with `items_capacity` capacity. /// The cache will be stored in the `root` directory. pub fn new(root: &'static str, items_capacity: usize) -> Self { Self { cache: Cache::new(items_capacity), root } } + /// Build a path for the given `key`. + pub fn path(&self, key: &Key) -> PathBuf { + key.path(self.root) + } + + /// Remove the item with the given `key` from the cache. + pub fn remove(&self, key: &Key) -> Option<(Key, Val)> { + let _ = std::fs::remove_dir_all(self.path(key)); + self.cache.remove(key) + } + /// Gets or inserts an item in the cache with key `key`. - pub async fn get_or_insert_async<'a, Q, F>(&'a self, key: &Q, with: F) -> error::Result + pub async fn get_or_insert_async<'a, T: fs::Bundle, Q, F>( + &'a self, + key: &Q, + map: impl Fn(T) -> Val, + with: F, + ) -> error::Result where - Q: Hash + Equivalent + ToOwned + Copy + Into, - F: Future>, + Q: Hash + Equivalent + ToOwned, + F: Future>, { self.cache - .get_or_insert_async( - key, - fs::import_or_insert_with(self.root, (*key).into(), with), - ) + .get_or_insert_async(key, async { + let key = key.to_owned(); + fs::import_or_insert_with(self.path(&key), with) + .await + .map(map) + }) .await } } @@ -101,57 +127,175 @@ pub mod future { use super::*; /// Extension trait for futures that can be cached. - pub trait FutureCachedExt: - Future> + Sized - { + pub trait FutureCachedExt: Future> + Sized { /// Get or insert the future result in the cache. /// /// # Example /// ```rust /// use windmill_common::cache::{self, future::FutureCachedExt}; + /// use sqlx::types::Json; /// + /// #[allow(unused)] /// async { - /// let result = std::future::ready(Ok(42)) - /// .cached(cache::anon!({ u64 => u64 } in "test" <= 1), &42) + /// let result = std::future::ready(Ok(Json(42))) + /// .cached(cache::anon!({ u64 => Json } in "test" <= 1), &42, |x| x) /// .await; /// - /// assert_eq!(result.unwrap(), 42); + /// assert_eq!(result.unwrap(), Json(42)); /// }; /// ``` - fn cached( + fn cached( self, cache: &FsBackedCache, key: &Q, + map: impl Fn(T) -> Val, ) -> impl Future> where - Q: Hash + Equivalent + ToOwned + Copy + Into, + Q: Hash + Equivalent + ToOwned, { - cache.get_or_insert_async(key, self) + cache.get_or_insert_async(key, map, self) } } - impl> + Sized> - FutureCachedExt for F - { + impl> + Sized> FutureCachedExt for F {} +} + +/// Flow data: i.e. a cached `raw_flow`. +/// Contains the original json raw value and a pre-parsed [`FlowValue`]. +#[derive(Debug, Clone)] +pub struct FlowData { + pub raw_flow: Box, + pub flow: Result, +} + +impl FlowData { + pub fn from_utf8(vec: Vec) -> error::Result { + Ok(Self::from_raw(RawValue::from_string(String::from_utf8( + vec, + )?)?)) } + + pub fn from_raw(raw_flow: Box) -> Self { + let flow = serde_json::from_str(raw_flow.get()) + .map_err(|e| format!("Invalid flow value: {:?}", e)); + Self { raw_flow, flow } + } + + pub fn value(&self) -> error::Result<&FlowValue> { + self.flow + .as_ref() + .map_err(|err| error::Error::InternalErr(err.clone())) + } +} + +impl Default for FlowData { + fn default() -> Self { + Self { raw_flow: Default::default(), flow: Err(Default::default()) } + } +} + +#[derive(Debug, Clone, Default)] +pub struct ScriptData { + pub lock: Option, + pub code: String, +} + +impl ScriptData { + pub fn from_raw(lock: Option, code: Option) -> Self { + let lock = lock.and_then(|x| if x.is_empty() { None } else { Some(x) }); + let code = code.unwrap_or_default(); + Self { lock, code } + } +} + +#[derive(Debug, Clone)] +pub enum RawData { + Flow(Arc), + Script(Arc), +} + +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct ScriptMetadata { + pub language: Option, + pub envs: Option>, + pub codebase: Option, } +const _: () = { + impl fs::Bundle for FlowData { + type Item = &'static str; + + fn items() -> impl Iterator { + ["flow.json"].into_iter() + } + + fn import(&mut self, _: Self::Item, data: Vec) -> error::Result<()> { + *self = Self::from_utf8(data)?; + Ok(()) + } + + fn export(&self, _: Self::Item) -> error::Result>> { + match self.raw_flow.get().is_empty() { + false => Ok(Some(self.raw_flow.get().as_bytes().to_vec())), + true => Ok(None), + } + } + } + + impl fs::Bundle for ScriptData { + type Item = &'static str; + + fn items() -> impl Iterator { + ["lock.txt", "code.txt"].into_iter() + } + + fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()> { + match item { + "lock.txt" => self.lock = Some(String::from_utf8(data)?), + "code.txt" => self.code = String::from_utf8(data)?, + _ => {} + } + Ok(()) + } + + fn export(&self, item: Self::Item) -> error::Result>> { + match item { + "lock.txt" => Ok(self.lock.as_ref().map(|s| s.as_bytes().to_vec())), + "code.txt" if !self.code.is_empty() => Ok(Some(self.code.as_bytes().to_vec())), + _ => Ok(None), + } + } + } + + impl fs::Bundle for ScriptMetadata { + type Item = &'static str; + + fn items() -> impl Iterator { + ["info.json"].into_iter() + } + + fn import(&mut self, _: Self::Item, data: Vec) -> error::Result<()> { + *self = serde_json::from_slice(&data)?; + Ok(()) + } + + fn export(&self, _: Self::Item) -> error::Result>> { + Ok(Some(serde_json::to_vec(self)?)) + } + } +}; + pub mod flow { use super::*; - use crate::flows::{FlowNodeId, FlowValue}; make_static! { /// Flow node cache. - /// FIXME: Use `Arc` for cheap cloning. - static ref CACHE: { FlowNodeId => Val } in "flow" <= 1000; - } - - /// Flow node cache value. - #[derive(Debug, Clone, Default)] - struct Val { - lock: Option, - code: Option, - flow: Option, + /// FIXME: Use `Arc` for cheap cloning. + static ref NODES: { FlowNodeId => RawData } in "flow" <= 1000; + /// Flow version value cache (version id => value). + static ref FLOWS: { i64 => Arc } in "flows" <= 1000; + /// Flow version lite value cache (version id => value). + static ref FLOWS_LITE: { i64 => Arc } in "flowslite" <= 1000; } /// Fetch the flow node script referenced by `node` from the cache. @@ -161,17 +305,13 @@ pub mod flow { pub async fn fetch_script( e: impl PgExecutor<'_>, node: FlowNodeId, - ) -> error::Result<(Option, String)> { - fetch(e, node).await.and_then(|Val { lock, code, .. }| { - Ok(( - lock, - code.ok_or_else(|| { - error::Error::InternalErr(format!( - "Flow node ({:x}) isn't a script node.", - node.0 - )) - })?, - )) + ) -> error::Result> { + fetch_node(e, node).await.and_then(|data| match data { + RawData::Script(data) => Ok(data), + RawData::Flow(_) => Err(error::Error::InternalErr(format!( + "Flow node ({:x}) isn't a script node.", + node.0 + ))), }) } @@ -179,14 +319,16 @@ pub mod flow { /// If not present, import from the file-system cache or fetch it from the database and write /// it to the file system and cache. /// This should be preferred over fetching the database directly. - pub async fn fetch_flow(e: impl PgExecutor<'_>, node: FlowNodeId) -> error::Result { - fetch(e, node).await.and_then(|Val { flow, .. }| { - flow.ok_or_else(|| { - error::Error::InternalErr(format!( - "Flow node ({:x}) isn't a flow value node.", - node.0 - )) - }) + pub async fn fetch_flow( + e: impl PgExecutor<'_>, + node: FlowNodeId, + ) -> error::Result> { + fetch_node(e, node).await.and_then(|data| match data { + RawData::Flow(data) => Ok(data), + RawData::Script(_) => Err(error::Error::InternalErr(format!( + "Flow node ({:x}) isn't a flow node.", + node.0 + ))), }) } @@ -194,111 +336,86 @@ pub mod flow { /// If not present, import from the file-system cache or fetch it from the database and write /// it to the file system and cache. /// This should be preferred over fetching the database directly. - async fn fetch(e: impl PgExecutor<'_>, node: FlowNodeId) -> error::Result { + pub(super) async fn fetch_node( + e: impl PgExecutor<'_>, + node: FlowNodeId, + ) -> error::Result { // If not present, `get_or_insert_async` will lock the key until the future completes, // so only one thread will be able to fetch the data from the database and write it to // the file system and cache, hence no race on the file system. - CACHE - .get_or_insert_async(&node, async { - sqlx::query!( - "SELECT \ - lock AS \"lock: String\", \ - code AS \"code: String\", \ - flow::text AS \"flow: Box\" \ - FROM flow_node WHERE id = $1 LIMIT 1", - node.0, + NODES + .get_or_insert_async( + &node, + |(script, flow)| match flow { + Some(flow) => RawData::Flow(Arc::new(flow)), + _ => RawData::Script(Arc::new(script)), + }, + async { + sqlx::query!( + "SELECT \ + lock AS \"lock: String\", \ + code AS \"code: String\", \ + flow AS \"flow: Json>\" \ + FROM flow_node WHERE id = $1 LIMIT 1", + node.0, + ) + .fetch_one(e) + .await + .map_err(Into::into) + .map(|r| { + ( + ScriptData::from_raw(r.lock, r.code), + r.flow.map(|Json(raw_flow)| FlowData::from_raw(raw_flow)), + ) + }) + }, + ) + .await + } + + pub async fn fetch_version(e: impl PgExecutor<'_>, id: i64) -> error::Result> { + FLOWS + .get_or_insert_async(&id, Arc::new, async { + sqlx::query_scalar!( + "SELECT value AS \"value!: Json>\" + FROM flow_version WHERE id = $1 LIMIT 1", + id, ) .fetch_one(e) .await .map_err(Into::into) - .and_then(|r| { - Ok(Val { - lock: r - .lock - .and_then(|x| if x.is_empty() { None } else { Some(x) }), - code: r.code, - flow: match r.flow { - None => None, - Some(flow) => serde_json::from_str(&flow).map_err(|err| { - error::Error::InternalErr(format!( - "Unable to parse flow value: {err:?}" - )) - })?, - }, - }) - }) + .map(|Json(raw_flow)| FlowData::from_raw(raw_flow)) }) .await } - // ---------------------------------------------------------------------------------------------- - // impl `fs::Bundle` for `Val`. - - #[derive(Copy, Clone)] - enum Item { - Lock, - Code, - Flow, - } - - impl fs::Item for Item { - fn path(&self, root: &Path) -> PathBuf { - match self { - Self::Lock => root.join("lock.txt"), - Self::Code => root.join("code.txt"), - Self::Flow => root.join("flow.json"), - } - } - } - - impl fs::Bundle for Val { - type Item = Item; - - fn items() -> &'static [Self::Item] { - &[Item::Lock, Item::Code, Item::Flow] - } - - fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()> { - match item { - Item::Lock => self.lock = Some(String::from_utf8(data)?), - Item::Code => self.code = Some(String::from_utf8(data)?), - Item::Flow => self.flow = Some(serde_json::from_slice(&data)?), - } - Ok(()) - } - - fn export(&self, item: Self::Item) -> error::Result>> { - match item { - Item::Lock => Ok(self.lock.as_ref().map(|s| s.as_bytes().to_vec())), - Item::Code => Ok(self.code.as_ref().map(|s| s.as_bytes().to_vec())), - Item::Flow => Ok(self - .flow - .as_ref() - .map(|f| serde_json::to_vec(f)) - .transpose()?), - } - } + pub async fn fetch_version_lite( + e: impl PgExecutor<'_>, + id: i64, + ) -> error::Result> { + FLOWS_LITE + .get_or_insert_async(&id, Arc::new, async { + sqlx::query_scalar!( + "SELECT value AS \"value!: Json>\" + FROM flow_version_lite WHERE id = $1 LIMIT 1", + id, + ) + .fetch_one(e) + .await + .map_err(Into::into) + .map(|Json(raw_flow)| FlowData::from_raw(raw_flow)) + }) + .await } } pub mod script { use super::*; - use crate::scripts::{ScriptHash, ScriptLang}; make_static! { /// Scripts cache. /// FIXME: Use `Arc` for cheap cloning. - static ref CACHE: { ScriptHash => Val } in "script" <= 1000; - } - - /// Script cache value. - #[derive(Debug, Clone, Default)] - pub struct Val { - pub lock: Option, - pub code: String, - pub language: Option, - pub envs: Option>, - pub codebase: Option, + static ref CACHE: { ScriptHash => (Arc, Arc) } in "script" <= 1000; } /// Fetch the script referenced by `hash` from the cache. @@ -308,107 +425,55 @@ pub mod script { pub async fn fetch( e: impl PgExecutor<'_>, hash: ScriptHash, - workspace_id: &str, - ) -> error::Result { + ) -> error::Result<(Arc, Arc)> { // If not present, `get_or_insert_async` will lock the key until the future completes, // so only one thread will be able to fetch the data from the database and write it to // the file system and cache, hence no race on the file system. CACHE - .get_or_insert_async(&hash, async { - sqlx::query!( - "SELECT \ - lock AS \"lock: String\", \ - content AS \"code!: String\", - language AS \"language: Option\", \ - envs AS \"envs: Vec\", \ - codebase AS \"codebase: String\" \ - FROM script WHERE hash = $1 AND workspace_id = $2 LIMIT 1", - hash.0, - workspace_id, - ) - .fetch_one(e) - .await - .map_err(Into::into) - .map(|r| Val { - lock: r - .lock - .and_then(|x| if x.is_empty() { None } else { Some(x) }), - code: r.code, - language: r.language, - envs: r.envs, - codebase: r.codebase, - }) - }) + .get_or_insert_async( + &hash, + |(data, metadata)| (Arc::new(data), Arc::new(metadata)), + async { + sqlx::query!( + "SELECT \ + lock AS \"lock: String\", \ + content AS \"code!: String\", + language AS \"language: Option\", \ + envs AS \"envs: Vec\", \ + codebase AS \"codebase: String\" \ + FROM script WHERE hash = $1 LIMIT 1", + hash.0 + ) + .fetch_one(e) + .await + .map_err(Into::into) + .map(|r| { + ( + ScriptData::from_raw(r.lock, Some(r.code)), + ScriptMetadata { + language: r.language, + envs: r.envs, + codebase: r.codebase, + }, + ) + }) + }, + ) .await } - // ---------------------------------------------------------------------------------------------- - // impl `fs::Bundle` for `Val`. - - #[derive(Copy, Clone)] - pub enum Item { - Lock, - Code, - Info, - } - - impl fs::Item for Item { - fn path(&self, root: &Path) -> PathBuf { - match self { - Item::Lock => root.join("lock.txt"), - Item::Code => root.join("code.txt"), - Item::Info => root.join("info.json"), - } - } - } - - impl fs::Bundle for Val { - type Item = Item; - - fn items() -> &'static [Self::Item] { - &[Item::Lock, Item::Code, Item::Info] - } - - fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()> { - match item { - Item::Lock => self.lock = Some(String::from_utf8(data)?), - Item::Code => self.code = String::from_utf8(data)?, - Item::Info => { - (self.language, self.envs, self.codebase) = serde_json::from_slice(&data)? - } - } - Ok(()) - } - - fn export(&self, item: Self::Item) -> error::Result>> { - match item { - Item::Lock => Ok(self.lock.as_ref().map(|s| s.as_bytes().to_vec())), - Item::Code => Ok(Some(self.code.as_bytes().to_vec())), - Item::Info => Ok(Some(serde_json::to_vec(&( - &self.language, - &self.envs, - &self.codebase, - ))?)), - } - } + /// Invalidate the script cache for the given `hash`. + pub fn invalidate(hash: ScriptHash) { + let _ = CACHE.remove(&hash); } } pub mod app { use super::*; - use crate::apps::AppScriptId; make_static! { /// App scripts cache. - /// FIXME: Use `Arc` for cheap cloning. - static ref CACHE: { AppScriptId => Val } in "app" <= 1000; - } - - /// App app script cache value. - #[derive(Debug, Clone, Default)] - pub struct Val { - pub lock: Option, - pub code: String, + static ref CACHE: { AppScriptId => Arc } in "app" <= 1000; } /// Fetch the app script referenced by `id` from the cache. @@ -418,12 +483,12 @@ pub mod app { pub async fn fetch_script( e: impl PgExecutor<'_>, id: AppScriptId, - ) -> error::Result<(Option, String)> { + ) -> error::Result> { // If not present, `get_or_insert_async` will lock the key until the future completes, // so only one thread will be able to fetch the data from the database and write it to // the file system and cache, hence no race on the file system. CACHE - .get_or_insert_async(&id, async { + .get_or_insert_async(&id, Arc::new, async { sqlx::query!( "SELECT lock, code FROM app_script WHERE id = $1 LIMIT 1", id.0, @@ -431,55 +496,125 @@ pub mod app { .fetch_one(e) .await .map_err(Into::into) - .map(|r| Val { - lock: r - .lock - .and_then(|x| if x.is_empty() { None } else { Some(x) }), - code: r.code, - }) + .map(|r| ScriptData::from_raw(r.lock, Some(r.code))) }) .await - .map(|Val { lock, code }| (lock, code)) } +} - // ---------------------------------------------------------------------------------------------- - // impl `fs::Bundle` for `Val`. +pub mod job { + use super::*; + use crate::jobs::JobKind; - #[derive(Copy, Clone)] - pub enum Item { - Lock, - Code, + use uuid::Uuid; + + lazy_static! { + /// Very small in-memory cache for "preview" jobs raw data. + static ref PREVIEWS: Cache = Cache::new(50); } - impl fs::Item for Item { - fn path(&self, root: &Path) -> PathBuf { - match self { - Item::Lock => root.join("lock.txt"), - Item::Code => root.join("code.txt"), - } - } + pub async fn fetch_preview_flow( + e: impl PgExecutor<'_>, + job: &Uuid, + // original raw values from `queue` or `completed_job` tables: + // kept for backward compatibility. + raw_flow: Option>>, + ) -> error::Result> { + fetch_preview(e, job, None, None, raw_flow) + .await + .and_then(|data| match data { + RawData::Flow(data) => Ok(data), + RawData::Script(_) => Err(error::Error::InternalErr(format!( + "Job ({job}) isn't a flow job." + ))), + }) } - impl fs::Bundle for Val { - type Item = Item; + pub async fn fetch_preview_script( + e: impl PgExecutor<'_>, + job: &Uuid, + // original raw values from `queue` or `completed_job` tables: + // kept for backward compatibility. + raw_lock: Option, + raw_code: Option, + ) -> error::Result> { + fetch_preview(e, job, raw_lock, raw_code, None) + .await + .and_then(|data| match data { + RawData::Script(data) => Ok(data), + RawData::Flow(_) => Err(error::Error::InternalErr(format!( + "Job ({job}) isn't a script job." + ))), + }) + } - fn items() -> &'static [Self::Item] { - &[Item::Lock, Item::Code] - } + pub async fn fetch_preview( + e: impl PgExecutor<'_>, + job: &Uuid, + // original raw values from `queue` or `completed_job` tables: + // kept for backward compatibility. + raw_lock: Option, + raw_code: Option, + raw_flow: Option>>, + ) -> error::Result { + PREVIEWS + .get_or_insert_async(job, async { + match (raw_lock, raw_code, raw_flow) { + (None, None, None) => sqlx::query!( + "SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json>\" \ + FROM job WHERE id = $1 LIMIT 1", + job + ) + .fetch_one(e) + .map_err(Into::into) + .await + .map(|r| (r.raw_lock, r.raw_code, r.raw_flow)), + (lock, code, flow) => Ok((lock, code, flow)), + } + .map(|(lock, code, flow)| match flow { + Some(Json(flow)) => RawData::Flow(Arc::new(FlowData::from_raw(flow))), + _ => RawData::Script(Arc::new(ScriptData::from_raw(lock, code))), + }) + }) + .await + } - fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()> { - match item { - Item::Lock => self.lock = Some(String::from_utf8(data)?), - Item::Code => self.code = String::from_utf8(data)?, - } - Ok(()) + pub async fn fetch_script( + e: impl PgExecutor<'_>, + kind: JobKind, + hash: Option, + ) -> error::Result> { + use JobKind::*; + match (kind, hash.map(|ScriptHash(id)| id)) { + (FlowScript, Some(id)) => flow::fetch_script(e, FlowNodeId(id)).await, + (Script | Dependencies, Some(hash)) => script::fetch(e, ScriptHash(hash)) + .await + .map(|(raw_script, _metadata)| raw_script), + (AppScript, Some(id)) => app::fetch_script(e, AppScriptId(id)).await, + _ => Err(error::Error::InternalErr(format!( + "Isn't a script job: {:?}", + kind + ))), } + } - fn export(&self, item: Self::Item) -> error::Result>> { - match item { - Item::Lock => Ok(self.lock.as_ref().map(|s| s.as_bytes().to_vec())), - Item::Code => Ok(Some(self.code.as_bytes().to_vec())), - } + pub async fn fetch_flow( + e: impl PgExecutor<'_> + Copy, + kind: JobKind, + hash: Option, + ) -> error::Result> { + use JobKind::*; + match (kind, hash.map(|ScriptHash(id)| id)) { + (FlowDependencies, Some(id)) => flow::fetch_version(e, id).await, + (FlowNode, Some(id)) => flow::fetch_flow(e, FlowNodeId(id)).await, + (Flow, Some(id)) => match flow::fetch_version_lite(e, id).await { + Ok(raw_flow) => Ok(raw_flow), + Err(_) => flow::fetch_version(e, id).await, + }, + _ => Err(error::Error::InternalErr(format!( + "Isn't a flow job {:?}", + kind + ))), } } } @@ -490,12 +625,14 @@ mod fs { use std::fs::{self, OpenOptions}; use std::io::{Read, Write}; + use uuid::Uuid; + /// A bundle of items that can be imported/exported from/into the file-system. pub trait Bundle: Default { /// Item type of the bundle. - type Item: Item; + type Item: Item + Copy; /// Returns a slice of all items than **can** exists within the bundle. - fn items() -> &'static [Self::Item]; + fn items() -> impl Iterator; /// Import the given `data` into the `item`. fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()>; /// Export the `item` into a `Vec`. @@ -503,31 +640,30 @@ mod fs { } /// An item that can be imported/exported from/into the file-system. - pub trait Item: Copy + 'static { + pub trait Item: Sized { /// Returns the path of the item within the given `root` path. - fn path(&self, root: &Path) -> PathBuf; + fn path(&self, root: impl AsRef) -> PathBuf; } /// Import or insert a bundle within the given combination of `{root}/{key}/`. - pub async fn import_or_insert_with(root: &str, key: u64, f: F) -> error::Result + pub async fn import_or_insert_with(path: impl AsRef, f: F) -> error::Result where T: Bundle, F: Future>, { - // Generate the file path from `root` path and `key`. - let path = Path::new(root).join(format!("{:016x}", key)); + let path = path.as_ref(); // Retrieve the data from the cache directory or the database. - if fs::metadata(&path).is_ok() { + if fs::metadata(path).is_ok() { // Cache path exists, read its contents. let import = || -> error::Result { let mut data = T::default(); for item in T::items() { let mut buf = vec![]; - let Ok(mut file) = OpenOptions::new().read(true).open(item.path(&path)) else { + let Ok(mut file) = OpenOptions::new().read(true).open(item.path(path)) else { continue; }; file.read_to_end(&mut buf)?; - data.import(*item, buf)?; + data.import(item, buf)?; } tracing::debug!("Imported from file-system: {:?}", path); Ok(data) @@ -542,16 +678,16 @@ mod fs { // Cache path doesn't exist or import failed, generate the content. let data = f.await?; let export = |data: &T| -> error::Result<()> { - fs::create_dir_all(&path)?; + fs::create_dir_all(path)?; // Write the generated data to the file. for item in T::items() { - let Some(buf) = data.export(*item)? else { + let Some(buf) = data.export(item)? else { continue; }; let mut file = OpenOptions::new() .write(true) .create(true) - .open(item.path(&path))?; + .open(item.path(path))?; file.write_all(&buf)?; } tracing::debug!("Exported to file-system: {:?}", path); @@ -561,33 +697,135 @@ mod fs { // If failed, remove the directory but still return the data. if let Err(err) = export(&data) { tracing::warn!("Failed to export to file-system: {path:?}: {err:?}"); - let _ = fs::remove_dir_all(&path); + let _ = fs::remove_dir_all(path); } Ok(data) } - // Auto-implement `Bundle` for all `serde` serializable types. + // Implement `Bundle`. + + // Empty bundle. + impl Bundle for () { + type Item = &'static str; - impl Item for () { - fn path(&self, root: &Path) -> PathBuf { - root.join("self.json") + fn items() -> impl Iterator { + [].into_iter() + } + + fn import(&mut self, _: Self::Item, _: Vec) -> error::Result<()> { + Ok(()) + } + + fn export(&self, _: Self::Item) -> error::Result>> { + Ok(None) } } - impl Deserialize<'de> + Serialize + Default> Bundle for T { - type Item = (); + // JSON bundle. + impl Deserialize<'de> + Serialize + Default> Bundle for Json { + type Item = &'static str; - fn items() -> &'static [Self::Item] { - &[()] + fn items() -> impl Iterator { + ["self.json"].into_iter() } fn import(&mut self, _: Self::Item, data: Vec) -> error::Result<()> { - *self = serde_json::from_slice(&data)?; + self.0 = serde_json::from_slice(&data)?; Ok(()) } fn export(&self, _: Self::Item) -> error::Result>> { - Ok(Some(serde_json::to_vec(self)?)) + Ok(Some(serde_json::to_vec(&self.0)?)) } } + + // Optional bundle. + impl Bundle for Option { + type Item = T::Item; + + fn items() -> impl Iterator { + T::items() + } + + fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()> { + let mut x = T::default(); + x.import(item, data)?; + *self = Some(x); + Ok(()) + } + + fn export(&self, item: Self::Item) -> error::Result>> { + match self { + Some(x) => x.export(item), + _ => Ok(None), + } + } + } + + // Bundle pair. + impl, B: Bundle> Bundle for (A, B) { + type Item = I; + + fn items() -> impl Iterator { + A::items().chain(B::items()) + } + + fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()> { + match A::items().any(|i| i == item) { + true => self.0.import(item, data), + _ => self.1.import(item, data), + } + } + + fn export(&self, item: Self::Item) -> error::Result>> { + match A::items().any(|i| i == item) { + true => self.0.export(item), + _ => self.1.export(item), + } + } + } + + // Implement `Item`. + + macro_rules! impl_item { + ($( ($t:ty, |$x:ident| $join:expr) ),*) => { + $( + impl Item for $t { + fn path(&self, root: impl AsRef) -> PathBuf { + let $x = self; + root.as_ref().join($join) + } + } + )* + }; + } + + impl_item! { + (&'static str, |x| x), + (i64, |x| format!("{:016x}", *x as u64)), + (u64, |x| format!("{:016x}", x)), + (Uuid, |x| format!("{:032x}", x.as_u128())), + (ScriptHash, |x| format!("{:016x}", x.0)), + (FlowNodeId, |x| format!("{:016x}", x.0)), + (AppScriptId, |x| format!("{:016x}", x.0)) + } + + #[cfg(test)] + #[test] + fn test_items() { + let p = "test".path("/tmp"); + assert_eq!(p, PathBuf::from("/tmp/test")); + let p = i64::MAX.path("/tmp"); + assert_eq!(p, PathBuf::from("/tmp/7fffffffffffffff")); + let p = u64::MAX.path("/tmp"); + assert_eq!(p, PathBuf::from("/tmp/ffffffffffffffff")); + let p = Uuid::from_u128(u128::MAX).path("/tmp"); + assert_eq!(p, PathBuf::from("/tmp/ffffffffffffffffffffffffffffffff")); + let p = ScriptHash(i64::MAX).path("/tmp"); + assert_eq!(p, PathBuf::from("/tmp/7fffffffffffffff")); + let p = FlowNodeId(i64::MAX).path("/tmp"); + assert_eq!(p, PathBuf::from("/tmp/7fffffffffffffff")); + let p = AppScriptId(i64::MAX).path("/tmp"); + assert_eq!(p, PathBuf::from("/tmp/7fffffffffffffff")); + } } diff --git a/backend/windmill-common/src/flows.rs b/backend/windmill-common/src/flows.rs index 29edd4db3034b..4beca757f48b0 100644 --- a/backend/windmill-common/src/flows.rs +++ b/backend/windmill-common/src/flows.rs @@ -407,12 +407,6 @@ pub enum InputTransform { #[serde(transparent)] pub struct FlowNodeId(pub i64); -impl Into for FlowNodeId { - fn into(self) -> u64 { - self.0 as u64 - } -} - #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Branch { #[serde(skip_serializing_if = "Option::is_none")] @@ -784,7 +778,9 @@ pub async fn resolve_module( let (lock, content) = if !with_code { (Some("...".to_string()), "...".to_string()) } else { - cache::flow::fetch_script(e, id).await? + cache::flow::fetch_script(e, id) + .await + .map(|data| (data.lock.clone(), data.code.clone()))? }; val = RawScript { input_transforms, @@ -844,7 +840,7 @@ pub async fn resolve_modules( if let Some(id) = modules_node { *modules = cache::flow::fetch_flow(e, id) .await - .map(|flow| flow.modules)?; + .and_then(|data| Ok(data.value()?.modules.clone()))?; } for module in modules.iter_mut() { Box::pin(resolve_module( diff --git a/backend/windmill-common/src/scripts.rs b/backend/windmill-common/src/scripts.rs index 75e4e1e0c0bcc..0fb05e6f45c73 100644 --- a/backend/windmill-common/src/scripts.rs +++ b/backend/windmill-common/src/scripts.rs @@ -24,7 +24,7 @@ use serde::{ser::SerializeSeq, Deserialize, Deserializer, Serialize}; use crate::utils::StripPath; -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Hash, Eq, sqlx::Type)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone, Hash, Eq, sqlx::Type)] #[sqlx(type_name = "SCRIPT_LANG", rename_all = "lowercase")] #[serde(rename_all(serialize = "lowercase", deserialize = "lowercase"))] pub enum ScriptLang { @@ -77,12 +77,6 @@ impl ScriptLang { #[sqlx(transparent)] pub struct ScriptHash(pub i64); -impl Into for ScriptHash { - fn into(self) -> u64 { - self.0 as u64 - } -} - #[derive(PartialEq, sqlx::Type)] #[sqlx(transparent, no_pg_array)] pub struct ScriptHashes(pub Vec); diff --git a/backend/windmill-common/src/worker.rs b/backend/windmill-common/src/worker.rs index ba8ed8aa52c0b..c64184b4849f5 100644 --- a/backend/windmill-common/src/worker.rs +++ b/backend/windmill-common/src/worker.rs @@ -97,6 +97,7 @@ lazy_static::lazy_static! { pub static ref MIN_VERSION: Arc> = Arc::new(RwLock::new(Version::new(0, 0, 0))); pub static ref MIN_VERSION_IS_AT_LEAST_1_427: Arc> = Arc::new(RwLock::new(false)); pub static ref MIN_VERSION_IS_AT_LEAST_1_432: Arc> = Arc::new(RwLock::new(false)); + pub static ref MIN_VERSION_IS_AT_LEAST_1_440: Arc> = Arc::new(RwLock::new(false)); // Features flags: pub static ref DISABLE_FLOW_SCRIPT: bool = std::env::var("DISABLE_FLOW_SCRIPT").ok().is_some_and(|x| x == "1" || x == "true"); @@ -610,6 +611,7 @@ pub async fn update_min_version<'c, E: sqlx::Executor<'c, Database = sqlx::Postg *MIN_VERSION_IS_AT_LEAST_1_427.write().await = min_version >= Version::new(1, 427, 0); *MIN_VERSION_IS_AT_LEAST_1_432.write().await = min_version >= Version::new(1, 432, 0); + *MIN_VERSION_IS_AT_LEAST_1_440.write().await = min_version >= Version::new(1, 440, 0); *MIN_VERSION.write().await = min_version.clone(); min_version >= cur_version diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 37ce52b76d272..9df0b70d897d8 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -30,7 +30,7 @@ use windmill_audit::ActionKind; use windmill_common::{ auth::{fetch_authed_from_permissioned_as, permissioned_as_to_username}, - cache, + cache::{self, FlowData}, db::{Authed, UserDB}, error::{self, to_anyhow, Error}, flow_status::{ @@ -50,8 +50,8 @@ use windmill_common::{ utils::{not_found_if_none, report_critical_error, StripPath, WarnAfterExt}, worker::{ to_raw_value, CLOUD_HOSTED, DEFAULT_TAGS_PER_WORKSPACE, DEFAULT_TAGS_WORKSPACES, - DISABLE_FLOW_SCRIPT, MIN_VERSION_IS_AT_LEAST_1_427, MIN_VERSION_IS_AT_LEAST_1_432, NO_LOGS, - WORKER_PULL_QUERIES, WORKER_SUSPENDED_PULL_QUERY, + DISABLE_FLOW_SCRIPT, MIN_VERSION_IS_AT_LEAST_1_427, MIN_VERSION_IS_AT_LEAST_1_432, + MIN_VERSION_IS_AT_LEAST_1_440, NO_LOGS, WORKER_PULL_QUERIES, WORKER_SUSPENDED_PULL_QUERY, }, DB, METRICS_ENABLED, }; @@ -2489,6 +2489,16 @@ pub enum PushIsolationLevel<'c> { Transaction(Transaction<'c, Postgres>), } +impl<'c> PushIsolationLevel<'c> { + async fn into_tx(self) -> error::Result> { + match self { + PushIsolationLevel::Isolated(db, authed) => Ok((db.begin(&authed).await?).into()), + PushIsolationLevel::IsolatedRoot(db) => Ok(db.begin().await?), + PushIsolationLevel::Transaction(tx) => Ok(tx), + } + } +} + #[macro_export] macro_rules! fetch_scalar_isolated { ( $query:expr, $tx:expr) => { @@ -2858,14 +2868,23 @@ pub async fn push<'c, 'd>( None, ), JobPayload::FlowNode { id, path } => { - let value = cache::flow::fetch_flow(_db, id).await?; - let status = Some(FlowStatus::new(&value)); + let data = cache::flow::fetch_flow(_db, id).await?; + let value = data.value()?; + let status = Some(FlowStatus::new(value)); + // Keep inserting `value` if not all workers are updated. + // Starting at `v1.440`, the value is fetched on pull from the flow node id. + let value_o = if !*MIN_VERSION_IS_AT_LEAST_1_440.read().await { + Some(value.clone()) + } else { + // `raw_flow` is fetched on pull. + None + }; ( Some(id.0), Some(path), None, JobKind::FlowNode, - Some(value), + value_o, status, None, None, @@ -2984,7 +3003,7 @@ pub async fn push<'c, 'd>( Some(path), None, JobKind::FlowDependencies, - Some(flow_value.clone()), + Some(flow_value), None, None, None, @@ -2995,22 +3014,24 @@ pub async fn push<'c, 'd>( None, ), JobPayload::FlowDependencies { path, dedicated_worker, version } => { - let value_json = fetch_scalar_isolated!( - sqlx::query_scalar!("SELECT value as \"value: sqlx::types::Json>\" FROM flow_version WHERE id = $1 LIMIT 1", &version), - tx - )? - .ok_or_else(|| Error::InternalErr(format!("not found flow at path {:?}", path)))?; - let value = serde_json::from_str::(value_json.get()).map_err(|err| { - Error::InternalErr(format!( - "could not convert json to flow for {path}: {err:?}" - )) - })?; + // Keep inserting `value` if not all workers are updated. + // Starting at `v1.440`, the value is fetched on pull from the version id. + let value_o = if !*MIN_VERSION_IS_AT_LEAST_1_440.read().await { + let mut ntx = tx.into_tx().await?; + // The version has been inserted only within the transaction. + let data = cache::flow::fetch_version(&mut *ntx, version).await?; + tx = PushIsolationLevel::Transaction(ntx); + Some(data.value()?.clone()) + } else { + // `raw_flow` is fetched on pull. + None + }; ( Some(version), Some(path), None, JobKind::FlowDependencies, - Some(value.clone()), + value_o, None, None, None, @@ -3041,11 +3062,10 @@ pub async fn push<'c, 'd>( let flow_status: FlowStatus = match restarted_from { Some(restarted_from_val) => { - let (_, _, step_n, truncated_modules, _, user_states, cleanup_module) = + let (_, _, _, step_n, truncated_modules, user_states, cleanup_module) = restarted_flows_resolution( _db, workspace_id, - Some(value.clone()), restarted_from_val.flow_job_id, restarted_from_val.step_id.as_str(), restarted_from_val.branch_or_iteration_n, @@ -3080,20 +3100,25 @@ pub async fn push<'c, 'd>( FlowStatus::new(&value) } // this is a new flow being pushed, flow_status is set to flow_value }; + let concurrency_key = value.concurrency_key.clone(); + let concurrent_limit = value.concurrent_limit; + let concurrency_time_window_s = value.concurrency_time_window_s; + let cache_ttl = value.cache_ttl.map(|x| x as i32); + let priority = value.priority; ( None, path, None, JobKind::FlowPreview, - Some(value.clone()), + Some(value), Some(flow_status), None, - value.concurrency_key.clone(), - value.concurrent_limit.clone(), - value.concurrency_time_window_s, - value.cache_ttl.map(|x| x as i32), + concurrency_key, + concurrent_limit, + concurrency_time_window_s, + cache_ttl, None, - value.priority, + priority, ) } JobPayload::SingleScriptFlow { @@ -3115,15 +3140,13 @@ pub async fn push<'c, 'd>( let flow_value = FlowValue { modules: vec![FlowModule { id: "a".to_string(), - value: windmill_common::worker::to_raw_value( - &windmill_common::flows::FlowModuleValue::Script { - input_transforms: input_transforms, - path: path.clone(), - hash: Some(hash), - tag_override: tag_override, - is_trigger: None, - }, - ), + value: to_raw_value(&FlowModuleValue::Script { + input_transforms, + path: path.clone(), + hash: Some(hash), + tag_override, + is_trigger: None, + }), stop_after_if: None, stop_after_all_iters_if: None, summary: None, @@ -3140,22 +3163,24 @@ pub async fn push<'c, 'd>( }], same_worker: false, failure_module: None, - concurrency_time_window_s: concurrency_time_window_s, - concurrent_limit: concurrent_limit, + concurrency_time_window_s, + concurrent_limit, skip_expr: None, cache_ttl: cache_ttl.map(|val| val as u32), early_return: None, concurrency_key: custom_concurrency_key.clone(), - priority: priority, + priority, preprocessor_module: None, }; + // this is a new flow being pushed, flow_status is set to flow_value: + let flow_status: FlowStatus = FlowStatus::new(&flow_value); ( None, Some(path), None, JobKind::Flow, - Some(flow_value.clone()), - Some(FlowStatus::new(&flow_value)), // this is a new flow being pushed, flow_status is set to flow_value + Some(flow_value), + Some(flow_status), None, custom_concurrency_key, concurrent_limit, @@ -3166,51 +3191,47 @@ pub async fn push<'c, 'd>( ) } JobPayload::Flow { path, dedicated_worker, apply_preprocessor } => { + // Fetch the latest version of the flow. + // Note that this query is performed within an isolated transaction to secure the + // API surface. + let version = fetch_scalar_isolated!( + sqlx::query_scalar!( + "SELECT flow.versions[array_upper(flow.versions, 1)] AS \"version!: i64\" + FROM flow WHERE path = $1 AND workspace_id = $2", + &path, + &workspace_id + ), + tx + )? + .ok_or_else(|| Error::InternalErr(format!("not found flow at path {:?}", path)))?; + // Do not use the lite version unless all workers are updated. - let value_json = if *DISABLE_FLOW_SCRIPT || (!*MIN_VERSION_IS_AT_LEAST_1_432.read().await && !*CLOUD_HOSTED) { - fetch_scalar_isolated!( - sqlx::query_scalar!( - "SELECT flow_version.value as \"value!: sqlx::types::Json>\" FROM flow - LEFT JOIN flow_version - ON flow_version.id = flow.versions[array_upper(flow.versions, 1)] - WHERE flow.path = $1 AND flow.workspace_id = $2", - &path, &workspace_id - ), - tx - ) + // This does not need to be performed within the isolated Tx as checks had been + // performed before when the version was fetched. + let data = if *DISABLE_FLOW_SCRIPT + || (!*MIN_VERSION_IS_AT_LEAST_1_432.read().await && !*CLOUD_HOSTED) + { + cache::flow::fetch_version(_db, version).await } else { - fetch_scalar_isolated!( - sqlx::query_scalar!( - "SELECT coalesce(flow_version_lite.value, flow_version.value) as \"value!: sqlx::types::Json>\" FROM flow - LEFT JOIN flow_version - ON flow_version.id = flow.versions[array_upper(flow.versions, 1)] - LEFT JOIN flow_version_lite - ON flow_version_lite.id = flow_version.id - WHERE flow.path = $1 AND flow.workspace_id = $2 LIMIT 1", - &path, &workspace_id - ), - tx - ) - }? - .ok_or_else(|| Error::InternalErr(format!("not found flow at path {:?}", path)))?; - let mut value = serde_json::from_str::(value_json.get()).map_err(|err| { - Error::InternalErr(format!( - "could not convert json to flow for {path}: {err:?}" - )) - })?; + // Fallback to the original version if the lite version is not found. + // This also prevent a race condition where the flow is run just after deploy and + // the lite version is still being created. + cache::flow::fetch_version_lite(_db, version) + .or_else(|_| cache::flow::fetch_version(_db, version)) + .await + }?; + let value = data.value()?.clone(); let priority = value.priority; - add_virtual_items_if_necessary(&mut value.modules); - if same_worker { - value.same_worker = true; - } - let cache_ttl = value.cache_ttl.map(|x| x as i32).clone(); + let cache_ttl = value.cache_ttl.map(|x| x as i32); let custom_concurrency_key = value.concurrency_key.clone(); - let concurrency_time_window_s = value.concurrency_time_window_s.clone(); - let concurrent_limit = value.concurrent_limit.clone(); + let concurrency_time_window_s = value.concurrency_time_window_s; + let concurrent_limit = value.concurrent_limit; + // this is a new flow being pushed, status is set to `value`. + let mut status = FlowStatus::new(&value); let extra = args.extra.get_or_insert_with(HashMap::new); if !apply_preprocessor { - value.preprocessor_module = None; + status.preprocessor_module = None; extra.remove("wm_trigger"); } else { extra.entry("wm_trigger".to_string()).or_insert_with(|| { @@ -3219,14 +3240,30 @@ pub async fn push<'c, 'd>( })) }); } - let status = Some(FlowStatus::new(&value)); + // Keep inserting `value` if not all workers are updated. + // Starting at `v1.440`, the value is fetched on pull from the version id. + let value_o = if !*MIN_VERSION_IS_AT_LEAST_1_440.read().await { + let mut value = value; + add_virtual_items_if_necessary(&mut value.modules); + if same_worker { + value.same_worker = true; + } + if !apply_preprocessor { + value.preprocessor_module = None; + } + Some(value) + } else { + // `raw_flow` is fetched on pull, the mutations from the other branch are replaced + // by additional checks when handling the flow. + None + }; ( - None, + Some(version), // Starting from `v1.436`, the version id is used to fetch the value on pull. Some(path), None, JobKind::Flow, - Some(value), - status, // this is a new flow being pushed, flow_status is set to flow_value + value_o, + Some(status), None, custom_concurrency_key, concurrent_limit, @@ -3238,17 +3275,16 @@ pub async fn push<'c, 'd>( } JobPayload::RestartedFlow { completed_job_id, step_id, branch_or_iteration_n } => { let ( + version, flow_path, - raw_flow, + flow_data, step_n, truncated_modules, - priority, user_states, cleanup_module, ) = restarted_flows_resolution( _db, workspace_id, - None, completed_job_id, step_id.as_str(), branch_or_iteration_n, @@ -3277,18 +3313,32 @@ pub async fn push<'c, 'd>( user_states, preprocessor_module: None, }; + let value = flow_data.value()?; + let priority = value.priority; + let concurrency_key = value.concurrency_key.clone(); + let concurrent_limit = value.concurrent_limit; + let concurrency_time_window_s = value.concurrency_time_window_s; + let cache_ttl = value.cache_ttl.map(|x| x as i32); + // Keep inserting `value` if not all workers are updated. + // Starting at `v1.440`, the value is fetched on pull from the version id. + let value_o = if version.is_none() || !*MIN_VERSION_IS_AT_LEAST_1_440.read().await { + Some(value.clone()) + } else { + // `raw_flow` is fetched on pull. + None + }; ( - None, + version, flow_path, None, JobKind::Flow, - Some(raw_flow.clone()), + value_o, Some(restarted_flow_status), None, - raw_flow.concurrency_key, - raw_flow.concurrent_limit, - raw_flow.concurrency_time_window_s, - raw_flow.cache_ttl.map(|x| x as i32), + concurrency_key, + concurrent_limit, + concurrency_time_window_s, + cache_ttl, None, priority, ) @@ -3449,11 +3499,7 @@ pub async fn push<'c, 'd>( }) }; - let mut tx = match tx { - PushIsolationLevel::Isolated(user_db, authed) => (user_db.begin(&authed).await?).into(), - PushIsolationLevel::IsolatedRoot(db) => db.begin().await?, - PushIsolationLevel::Transaction(tx) => tx, - }; + let mut tx = tx.into_tx().await?; let job_id: Uuid = if let Some(job_id) = job_id { let conflicting_id = sqlx::query_scalar!( @@ -3700,66 +3746,48 @@ pub fn canceled_job_to_result(job: &QueuedJob) -> serde_json::Value { async fn restarted_flows_resolution( db: &Pool, workspace_id: &str, - flow_value_if_any: Option, completed_flow_id: Uuid, restart_step_id: &str, branch_or_iteration_n: Option, ) -> Result< ( + Option, Option, - FlowValue, + Arc, i32, Vec, - Option, HashMap, FlowCleanupModule, ), Error, > { - #[derive(sqlx::FromRow)] - struct CompletedJobWithRawFlow { - #[sqlx(flatten)] - completed_job: CompletedJob, - raw_flow: Option>>, - } - - let CompletedJobWithRawFlow { completed_job, raw_flow } = - sqlx::query_as::<_, CompletedJobWithRawFlow>( - "SELECT *, null as labels FROM completed_job WHERE id = $1 and workspace_id = $2", - ) - .bind(completed_flow_id) - .bind(workspace_id) - .fetch_one(db) // TODO: should we try to use the passed-in `tx` here? - .await - .map_err(|err| { - Error::InternalErr(format!( - "completed job not found for UUID {} in workspace {}: {}", - completed_flow_id, workspace_id, err - )) - })?; - - let flow_value = if let Some(flow_value) = flow_value_if_any { - Some(flow_value) - } else if let Some(raw_flow) = raw_flow.as_ref() { - serde_json::from_str::(raw_flow.get()).ok() - } else { - sqlx::query_scalar!( - "SELECT raw_flow AS \"raw_flow!: Json>\" - FROM job WHERE id = $1 AND workspace_id = $2 LIMIT 1", - &completed_flow_id, - workspace_id - ) - .fetch_one(db) - .await - .ok() - .and_then(|raw_flow| serde_json::from_str::(raw_flow.get()).ok()) - } - .ok_or(Error::InternalErr(format!( - "Unable to parse raw definition for job {} in workspace {}", - completed_flow_id, workspace_id, - )))?; - let flow_status = completed_job - .parse_flow_status() + let row = sqlx::query!( + "SELECT + script_path, script_hash AS \"script_hash: ScriptHash\", + job_kind AS \"job_kind: JobKind\", + flow_status AS \"flow_status: Json>\", + raw_flow AS \"raw_flow: Json>\" + FROM completed_job WHERE id = $1 and workspace_id = $2", + completed_flow_id, + workspace_id, + ) + .fetch_one(db) // TODO: should we try to use the passed-in `tx` here? + .await + .map_err(|err| { + Error::InternalErr(format!( + "completed job not found for UUID {} in workspace {}: {}", + completed_flow_id, workspace_id, err + )) + })?; + + let flow_data = cache::job::fetch_flow(db, row.job_kind, row.script_hash) + .or_else(|_| cache::job::fetch_preview_flow(db, &completed_flow_id, row.raw_flow)) + .await?; + let flow_value = flow_data.value()?; + let flow_status = row + .flow_status + .as_ref() + .and_then(|v| serde_json::from_str::(v.get()).ok()) .ok_or(Error::InternalErr(format!( "Unable to parse flow status for job {} in workspace {}", completed_flow_id, workspace_id, @@ -3821,7 +3849,7 @@ async fn restarted_flows_resolution( branch: branch_or_iteration_n - 1, // Doing minus one here as this variable reflects the latest finished job in the iteration len: branches.len(), }), - parallel: parallel, + parallel, while_loop: false, progress: None, }); @@ -3859,7 +3887,7 @@ async fn restarted_flows_resolution( flow_jobs_success: new_flow_jobs_success, branch_chosen: None, branchall: None, - parallel: parallel, + parallel, while_loop: false, progress: None, }); @@ -3896,13 +3924,13 @@ async fn restarted_flows_resolution( ))); } - return Ok(( - completed_job.script_path, - flow_value, + Ok(( + row.script_hash.map(|x| x.0), + row.script_path, + flow_data, step_n, truncated_modules, - completed_job.priority, flow_status.user_states, flow_status.cleanup_module, - )); + )) } diff --git a/backend/windmill-worker/src/ansible_executor.rs b/backend/windmill-worker/src/ansible_executor.rs index 068e5baa315e0..4be27de5b0931 100644 --- a/backend/windmill-worker/src/ansible_executor.rs +++ b/backend/windmill-worker/src/ansible_executor.rs @@ -49,7 +49,7 @@ const NSJAIL_CONFIG_RUN_ANSIBLE_CONTENT: &str = include_str!("../nsjail/run.ansi async fn handle_ansible_python_deps( job_dir: &str, - requirements_o: Option, + requirements_o: Option<&String>, ansible_reqs: Option<&AnsibleRequirements>, w_id: &str, job_id: &Uuid, @@ -70,16 +70,15 @@ async fn handle_ansible_python_deps( .unwrap_or_else(|| vec![]) .clone(); + let mut requirements; let requirements = match requirements_o { Some(r) => r, None => { - let requirements = ansible_reqs + requirements = ansible_reqs .map(|x| x.python_reqs.join("\n")) .unwrap_or("".to_string()); - if requirements.is_empty() { - "".to_string() - } else { - uv_pip_compile( + if !requirements.is_empty() { + requirements = uv_pip_compile( job_id, &requirements, mem_peak, @@ -95,8 +94,9 @@ async fn handle_ansible_python_deps( .await .map_err(|e| { error::Error::ExecutionErr(format!("pip compile failed: {}", e.to_string())) - })? + })?; } + &requirements } }; @@ -185,7 +185,7 @@ async fn install_galaxy_collections( } pub async fn handle_ansible_job( - requirements_o: Option, + requirements_o: Option<&String>, job_dir: &str, worker_dir: &str, worker_name: &str, diff --git a/backend/windmill-worker/src/bun_executor.rs b/backend/windmill-worker/src/bun_executor.rs index 707a668b844cc..d0fd654d732f1 100644 --- a/backend/windmill-worker/src/bun_executor.rs +++ b/backend/windmill-worker/src/bun_executor.rs @@ -665,7 +665,7 @@ pub fn copy_recursively( pub async fn prebundle_bun_script( inner_content: &str, - lockfile: Option, + lockfile: Option<&String>, script_path: &str, job_id: &Uuid, w_id: &str, @@ -678,7 +678,7 @@ pub async fn prebundle_bun_script( ) -> Result<()> { let (local_path, remote_path) = compute_bundle_local_and_remote_path( inner_content, - &lockfile, + lockfile, script_path, db.clone(), w_id, @@ -747,7 +747,7 @@ async fn get_script_import_updated_at(db: &DB, w_id: &str, script_path: &str) -> async fn compute_bundle_local_and_remote_path( inner_content: &str, - requirements_o: &Option, + requirements_o: Option<&String>, script_path: &str, db: Option, w_id: &str, @@ -755,10 +755,7 @@ async fn compute_bundle_local_and_remote_path( let mut input_src = format!( "{}{}", inner_content, - requirements_o - .as_ref() - .map(|x| x.to_string()) - .unwrap_or_default() + requirements_o.as_ref().map(|x| x.as_str()).unwrap_or("") ); if let Some(db) = db { @@ -811,8 +808,8 @@ async fn write_lockb(splitted_lockb_2: &str, job_dir: &str) -> Result<()> { #[tracing::instrument(level = "trace", skip_all)] pub async fn handle_bun_job( - requirements_o: Option, - codebase: Option, + requirements_o: Option<&String>, + codebase: Option<&String>, mem_peak: &mut i32, canceled_by: &mut Option, job: &QueuedJob, @@ -833,7 +830,7 @@ pub async fn handle_bun_job( if requirements_o.is_some() && !annotation.nobundling && codebase.is_none() { let (local_path, remote_path) = compute_bundle_local_and_remote_path( inner_content, - &requirements_o, + requirements_o, job.script_path(), Some(db.clone()), &job.workspace_id, diff --git a/backend/windmill-worker/src/common.rs b/backend/windmill-worker/src/common.rs index 51385ff3672f2..54d7307fd7db8 100644 --- a/backend/windmill-worker/src/common.rs +++ b/backend/windmill-worker/src/common.rs @@ -22,7 +22,7 @@ use windmill_common::worker::{ to_raw_value, write_file, CLOUD_HOSTED, ROOT_CACHE_DIR, WORKER_CONFIG, }; use windmill_common::{ - cache::Cache, + cache::{Cache, RawData}, error::{self, Error}, jobs::QueuedJob, scripts::ScriptHash, @@ -663,24 +663,24 @@ pub async fn cached_result_path( db: &DB, client: &AuthedClient, job: &QueuedJob, - raw_code: Option<&String>, - raw_lock: Option<&String>, - raw_flow: Option<&Json>>, + raw_data: Option<&RawData>, ) -> String { let mut hasher = sha2::Sha256::new(); hasher.update(&[job.job_kind as u8]); if let Some(ScriptHash(hash)) = job.script_hash { hasher.update(&hash.to_le_bytes()) - } else if let None = job.script_hash { + } else { job.script_path .as_ref() .inspect(|x| hasher.update(x.as_bytes())); - raw_code.inspect(|x| hasher.update(x)); - raw_lock.inspect(|x| hasher.update(x)); - raw_flow.inspect(|x| hasher.update(x.get())); + match raw_data { + Some(RawData::Flow(data)) => hasher.update(data.raw_flow.get()), + Some(RawData::Script(data)) => hasher.update(&data.code), + _ => {} + } } hash_args(db, client, &job.workspace_id, &job.args, &mut hasher).await; - format!("g/results/{:032x}", hasher.finalize()) + format!("g/results/{:064x}", hasher.finalize()) } #[cfg(feature = "parquet")] diff --git a/backend/windmill-worker/src/csharp_executor.rs b/backend/windmill-worker/src/csharp_executor.rs index df1c3b3d9c866..2195117091958 100644 --- a/backend/windmill-worker/src/csharp_executor.rs +++ b/backend/windmill-worker/src/csharp_executor.rs @@ -369,7 +369,7 @@ pub async fn handle_csharp_job( _client: &AuthedClientBackgroundTask, _inner_content: &str, _job_dir: &str, - _requirements_o: Option, + _requirements_o: Option<&String>, _shared_mount: &str, _base_internal_url: &str, _worker_name: &str, @@ -388,7 +388,7 @@ pub async fn handle_csharp_job( client: &AuthedClientBackgroundTask, inner_content: &str, job_dir: &str, - requirements_o: Option, + requirements_o: Option<&String>, shared_mount: &str, base_internal_url: &str, worker_name: &str, @@ -400,10 +400,7 @@ pub async fn handle_csharp_job( let hash = calculate_hash(&format!( "{}{}", inner_content, - requirements_o - .as_ref() - .map(|x| x.to_string()) - .unwrap_or_default() + requirements_o.unwrap_or(&String::new()) )); let bin_path = format!("{}/{hash}", CSHARP_CACHE_DIR); let remote_path = format!("{CSHARP_OBJECT_STORE_PREFIX}{hash}"); diff --git a/backend/windmill-worker/src/dedicated_worker.rs b/backend/windmill-worker/src/dedicated_worker.rs index 1adac0edf284e..56ea01c03846b 100644 --- a/backend/windmill-worker/src/dedicated_worker.rs +++ b/backend/windmill-worker/src/dedicated_worker.rs @@ -393,14 +393,14 @@ async fn spawn_dedicated_workers_for_flow( } } FlowModuleValue::FlowScript { id, language, .. } => { - let spawn = cache::flow::fetch_script(db, *id) - .await - .map(|(lock, content)| SpawnWorker::RawScript { + let spawn = cache::flow::fetch_script(db, *id).await.map(|data| { + SpawnWorker::RawScript { path: "".to_string(), - content, - lock, - lang: language.clone(), - }); + content: data.code.clone(), + lock: data.lock.clone(), + lang: *language, + } + }); match spawn { Ok(spawn) => { if let Some(dedi_w) = spawn_dedicated_worker( @@ -675,7 +675,7 @@ async fn spawn_dedicated_worker( token }; - let worker_envs = build_envs(envs).expect("failed to build envs"); + let worker_envs = build_envs(envs.as_ref()).expect("failed to build envs"); if let Err(e) = match language { Some(ScriptLang::Python3) => { @@ -688,7 +688,7 @@ async fn spawn_dedicated_worker( #[cfg(feature = "python")] crate::python_executor::start_worker( - lock, + lock.as_ref(), &db, &content, &base_internal_url, diff --git a/backend/windmill-worker/src/deno_executor.rs b/backend/windmill-worker/src/deno_executor.rs index 54a2631ccd90b..2385771a159af 100644 --- a/backend/windmill-worker/src/deno_executor.rs +++ b/backend/windmill-worker/src/deno_executor.rs @@ -179,7 +179,7 @@ pub async fn generate_deno_lock( #[tracing::instrument(level = "trace", skip_all)] pub async fn handle_deno_job( - requirements_o: Option, + requirements_o: Option<&String>, mem_peak: &mut i32, canceled_by: &mut Option, job: &QueuedJob, diff --git a/backend/windmill-worker/src/go_executor.rs b/backend/windmill-worker/src/go_executor.rs index ea06f82c5aa36..9a52da78cd968 100644 --- a/backend/windmill-worker/src/go_executor.rs +++ b/backend/windmill-worker/src/go_executor.rs @@ -41,7 +41,7 @@ pub async fn handle_go_job( client: &AuthedClientBackgroundTask, inner_content: &str, job_dir: &str, - requirements_o: Option, + requirements_o: Option<&String>, shared_mount: &str, base_internal_url: &str, worker_name: &str, diff --git a/backend/windmill-worker/src/php_executor.rs b/backend/windmill-worker/src/php_executor.rs index 387e2a6566231..d105881078891 100644 --- a/backend/windmill-worker/src/php_executor.rs +++ b/backend/windmill-worker/src/php_executor.rs @@ -132,7 +132,7 @@ $args->{arg_name} = new {rt_name}($args->{arg_name});" #[tracing::instrument(level = "trace", skip_all)] pub async fn handle_php_job( - requirements_o: Option, + requirements_o: Option<&String>, mem_peak: &mut i32, canceled_by: &mut Option, job: &QueuedJob, diff --git a/backend/windmill-worker/src/python_executor.rs b/backend/windmill-worker/src/python_executor.rs index 233c44fc898c3..578040374a391 100644 --- a/backend/windmill-worker/src/python_executor.rs +++ b/backend/windmill-worker/src/python_executor.rs @@ -508,7 +508,7 @@ fn copy_dir_recursively(src: &Path, dst: &Path) -> windmill_common::error::Resul #[tracing::instrument(level = "trace", skip_all)] pub async fn handle_python_job( - requirements_o: Option, + requirements_o: Option<&String>, job_dir: &str, worker_dir: &str, worker_name: &str, @@ -1049,7 +1049,7 @@ async fn replace_pip_secret( async fn handle_python_deps( job_dir: &str, - requirements_o: Option, + requirements_o: Option<&String>, inner_content: &str, w_id: &str, script_path: &str, @@ -1072,12 +1072,13 @@ async fn handle_python_deps( .clone(); let annotations = windmill_common::worker::PythonAnnotations::parse(inner_content); + let mut requirements; let requirements = match requirements_o { Some(r) => r, None => { let mut already_visited = vec![]; - let requirements = windmill_parser_py_imports::parse_python_imports( + requirements = windmill_parser_py_imports::parse_python_imports( inner_content, w_id, script_path, @@ -1086,10 +1087,8 @@ async fn handle_python_deps( ) .await? .join("\n"); - if requirements.is_empty() { - "".to_string() - } else { - uv_pip_compile( + if !requirements.is_empty() { + requirements = uv_pip_compile( job_id, &requirements, mem_peak, @@ -1105,8 +1104,9 @@ async fn handle_python_deps( .await .map_err(|e| { Error::ExecutionErr(format!("pip compile failed: {}", e.to_string())) - })? + })?; } + &requirements } }; @@ -1899,7 +1899,7 @@ use windmill_common::variables; #[cfg(feature = "enterprise")] pub async fn start_worker( - requirements_o: Option, + requirements_o: Option<&String>, db: &sqlx::Pool, inner_content: &str, base_internal_url: &str, diff --git a/backend/windmill-worker/src/rust_executor.rs b/backend/windmill-worker/src/rust_executor.rs index c26d4d5823a71..942ac8c16ada4 100644 --- a/backend/windmill-worker/src/rust_executor.rs +++ b/backend/windmill-worker/src/rust_executor.rs @@ -279,7 +279,7 @@ pub async fn handle_rust_job( client: &AuthedClientBackgroundTask, inner_content: &str, job_dir: &str, - requirements_o: Option, + requirements_o: Option<&String>, shared_mount: &str, base_internal_url: &str, worker_name: &str, @@ -288,7 +288,7 @@ pub async fn handle_rust_job( ) -> Result, Error> { check_executor_binary_exists("cargo", CARGO_PATH.as_str(), "rust")?; - let hash = compute_rust_hash(inner_content, requirements_o.as_ref()); + let hash = compute_rust_hash(inner_content, requirements_o); let bin_path = format!("{}/{hash}", RUST_CACHE_DIR); let remote_path = format!("{RUST_OBJECT_STORE_PREFIX}{hash}"); diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index f0bc52d627386..5fb572bb6d946 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -12,6 +12,7 @@ use windmill_common::{ apps::AppScriptId, auth::{fetch_authed_from_permissioned_as, JWTAuthClaims, JobPerms, JWT_SECRET}, + cache::{ScriptData, ScriptMetadata}, scripts::PREVIEW_IS_TAR_CODEBASE_HASH, utils::WarnAfterExt, worker::{ @@ -50,10 +51,9 @@ use std::{ use uuid::Uuid; use windmill_common::{ - cache, + cache::{self, RawData}, error::{self, to_anyhow, Error}, flows::FlowNodeId, - get_latest_deployed_hash_for_path, jobs::{JobKind, QueuedJob}, scripts::{get_full_hub_script_by_path, ScriptHash, ScriptLang, PREVIEW_IS_CODEBASE_HASH}, users::SUPERADMIN_SECRET_EMAIL, @@ -1965,33 +1965,22 @@ async fn handle_queued_job( } let started = Instant::now(); - let (raw_code, raw_lock, raw_flow) = match (raw_code, raw_lock, raw_flow) { - (None, None, None) => sqlx::query!( - "SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json>\" - FROM job WHERE id = $1 AND workspace_id = $2 LIMIT 1", - &job.id, - job.workspace_id - ) - .fetch_one(db) - .warn_after_seconds(5) - .await - .map(|record| (record.raw_code, record.raw_lock, record.raw_flow)) - .unwrap_or_default(), - (raw_code, raw_lock, raw_flow) => (raw_code, raw_lock, raw_flow), + // Pre-fetch preview jobs raw values if necessary. + // The `raw_*` values passed to this function are the original raw values from `queue` tables, + // they are kept for backward compatibility as they have been moved to the `job` table. + let preview_data = match (job.job_kind, job.script_hash) { + ( + JobKind::Preview + | JobKind::Dependencies + | JobKind::FlowPreview + | JobKind::Flow + | JobKind::FlowDependencies, + None, + ) => Some(cache::job::fetch_preview(db, &job.id, raw_lock, raw_code, raw_flow).await?), + _ => None, }; - let cached_res_path = if job.cache_ttl.is_some() { - Some( - cached_result_path( - db, - &client.get_authed().await, - &job, - raw_code.as_ref(), - raw_lock.as_ref(), - raw_flow.as_ref(), - ) - .await, - ) + Some(cached_result_path(db, &client.get_authed().await, &job, preview_data.as_ref()).await) } else { None }; @@ -2032,12 +2021,14 @@ async fn handle_queued_job( } }; if job.is_flow() { - let flow = raw_flow - .as_ref() - .and_then(|raw_flow| serde_json::from_str(raw_flow.get()).ok()); + let flow_data = match preview_data { + Some(RawData::Flow(data)) => data, + // Not a preview: fetch from the cache or the database. + _ => cache::job::fetch_flow(db, job.job_kind, job.script_hash).await?, + }; handle_flow( job, - flow, + &flow_data, db, &client.get_authed().await, None, @@ -2091,7 +2082,7 @@ async fn handle_queued_job( JobKind::Dependencies => { handle_dependency_job( &job, - raw_code, + preview_data.as_ref(), &mut mem_peak, &mut canceled_by, job_dir, @@ -2107,7 +2098,7 @@ async fn handle_queued_job( JobKind::FlowDependencies => { handle_flow_dependency_job( &job, - raw_flow, + preview_data.as_ref(), &mut mem_peak, &mut canceled_by, job_dir, @@ -2143,10 +2134,13 @@ async fn handle_queued_job( .unwrap_or_else(|| serde_json::from_str("{}").unwrap())), _ => { let metric_timer = Instant::now(); + let preview_data = preview_data.and_then(|data| match data { + RawData::Script(data) => Some(data), + _ => None, + }); let r = handle_code_execution_job( job.as_ref(), - raw_code, - raw_lock, + preview_data, db, client, job_dir, @@ -2198,7 +2192,7 @@ async fn handle_queued_job( } pub fn build_envs( - envs: Option>, + envs: Option<&Vec>, ) -> windmill_common::error::Result> { let mut envs = if *CLOUD_HOSTED || envs.is_none() { HashMap::new() @@ -2232,7 +2226,7 @@ pub struct ContentReqLangEnvs { } pub async fn get_hub_script_content_and_requirements( - script_path: Option, + script_path: Option<&String>, db: Option<&DB>, ) -> error::Result { let script_path = script_path @@ -2250,35 +2244,18 @@ pub async fn get_hub_script_content_and_requirements( }) } -async fn get_script_content_by_path( - script_path: Option, - w_id: &str, - db: &DB, -) -> error::Result { - let script_path = script_path - .clone() - .ok_or_else(|| Error::InternalErr(format!("expected script path")))?; - return if script_path.starts_with("hub/") { - get_hub_script_content_and_requirements(Some(script_path), Some(db)).await - } else { - let (script_hash, ..) = - get_latest_deployed_hash_for_path(db, w_id, script_path.as_str()).await?; - get_script_content_by_hash(&script_hash, w_id, db).await - }; -} - pub async fn get_script_content_by_hash( script_hash: &ScriptHash, - w_id: &str, + _w_id: &str, db: &DB, ) -> error::Result { - let script = cache::script::fetch(db, *script_hash, w_id).await?; + let (data, metadata) = cache::script::fetch(db, *script_hash).await?; Ok(ContentReqLangEnvs { - content: script.code, - lockfile: script.lock, - language: script.language, - envs: script.envs, - codebase: match script.codebase { + content: data.code.clone(), + lockfile: data.lock.clone(), + language: metadata.language, + envs: metadata.envs.clone(), + codebase: match metadata.codebase.as_ref() { None => None, Some(x) if x.ends_with(".tar") => Some(format!("{}.tar", script_hash)), Some(_) => Some(script_hash.to_string()), @@ -2289,8 +2266,7 @@ pub async fn get_script_content_by_hash( #[tracing::instrument(level = "trace", skip_all)] async fn handle_code_execution_job( job: &QueuedJob, - raw_code: Option, - raw_lock: Option, + preview: Option>, db: &sqlx::Pool, client: &AuthedClientBackgroundTask, job_dir: &str, @@ -2304,13 +2280,19 @@ async fn handle_code_execution_job( occupancy_metrics: &mut OccupancyMetrics, killpill_rx: &mut tokio::sync::broadcast::Receiver<()>, ) -> error::Result> { - let ContentReqLangEnvs { - content: inner_content, - lockfile: requirements_o, - language, - envs, - codebase, - } = match job.job_kind { + let script_hash = || { + job.script_hash + .ok_or_else(|| Error::InternalErr("expected script hash".into())) + }; + let (arc_data, arc_metadata, data, metadata): ( + Arc, + Arc, + ScriptData, + ScriptMetadata, + ); + let (ScriptData { code, lock }, ScriptMetadata { language, envs, codebase }) = match job + .job_kind + { JobKind::Preview => { let codebase = match job.script_hash.map(|x| x.0) { Some(PREVIEW_IS_CODEBASE_HASH) => Some(job.id.to_string()), @@ -2318,66 +2300,68 @@ async fn handle_code_execution_job( _ => None, }; - ContentReqLangEnvs { - content: raw_code.unwrap_or_else(|| "no raw code".to_owned()), - lockfile: raw_lock, - language: job.language.to_owned(), - envs: None, - codebase, - } + arc_data = preview.ok_or_else(|| Error::InternalErr("expected preview".to_string()))?; + metadata = ScriptMetadata { language: job.language, codebase, envs: None }; + (arc_data.as_ref(), &metadata) } JobKind::Script_Hub => { - get_hub_script_content_and_requirements(job.script_path.clone(), Some(db)).await? + let ContentReqLangEnvs { content, lockfile, language, envs, codebase } = + get_hub_script_content_and_requirements(job.script_path.as_ref(), Some(db)).await?; + data = ScriptData { code: content, lock: lockfile }; + metadata = ScriptMetadata { language, envs, codebase }; + (&data, &metadata) } JobKind::Script => { - get_script_content_by_hash( - &job.script_hash.unwrap_or(ScriptHash(0)), - &job.workspace_id, - db, - ) - .await? + (arc_data, arc_metadata) = cache::script::fetch(db, script_hash()?).await?; + (arc_data.as_ref(), arc_metadata.as_ref()) } JobKind::FlowScript => { - let (lockfile, content) = cache::flow::fetch_script( - db, - FlowNodeId(job.script_hash.unwrap_or(ScriptHash(0)).0), - ) - .await?; - ContentReqLangEnvs { - content, - lockfile, - language: job.language.to_owned(), - envs: None, - codebase: None, - } + arc_data = cache::flow::fetch_script(db, FlowNodeId(script_hash()?.0)).await?; + metadata = ScriptMetadata { language: job.language, envs: None, codebase: None }; + (arc_data.as_ref(), &metadata) } JobKind::AppScript => { - let (lockfile, content) = cache::app::fetch_script( - db, - AppScriptId(job.script_hash.unwrap_or(ScriptHash(0)).0), - ) - .await?; - ContentReqLangEnvs { - content, - lockfile, - language: job.language.to_owned(), - envs: None, - codebase: None, - } + arc_data = cache::app::fetch_script(db, AppScriptId(script_hash()?.0)).await?; + metadata = ScriptMetadata { language: job.language, envs: None, codebase: None }; + (arc_data.as_ref(), &metadata) } JobKind::DeploymentCallback => { - get_script_content_by_path(job.script_path.clone(), &job.workspace_id, db).await? + let script_path = job + .script_path + .as_ref() + .ok_or_else(|| Error::InternalErr("expected script path".to_string()))?; + if script_path.starts_with("hub/") { + let ContentReqLangEnvs { content, lockfile, language, envs, codebase } = + get_hub_script_content_and_requirements(Some(script_path), Some(db)).await?; + data = ScriptData { code: content, lock: lockfile }; + metadata = ScriptMetadata { language, envs, codebase }; + (&data, &metadata) + } else { + let hash = sqlx::query_scalar!( + "SELECT hash FROM script WHERE path = $1 AND workspace_id = $2 AND + deleted = false AND lock IS not NULL AND lock_error_logs IS NULL", + script_path, + &job.workspace_id + ) + .fetch_optional(db) + .await? + .ok_or_else(|| Error::InternalErr("expected script hash".to_string()))?; + + (arc_data, arc_metadata) = cache::script::fetch(db, ScriptHash(hash)).await?; + (arc_data.as_ref(), arc_metadata.as_ref()) + } } _ => unreachable!( "handle_code_execution_job should never be reachable with a non-code execution job" ), }; + let language = *language; if language == Some(ScriptLang::Postgresql) { return do_postgresql( job, &client, - &inner_content, + &code, db, mem_peak, canceled_by, @@ -2396,7 +2380,7 @@ async fn handle_code_execution_job( return do_mysql( job, &client, - &inner_content, + &code, db, mem_peak, canceled_by, @@ -2426,7 +2410,7 @@ async fn handle_code_execution_job( return do_bigquery( job, &client, - &inner_content, + &code, db, mem_peak, canceled_by, @@ -2449,7 +2433,7 @@ async fn handle_code_execution_job( return do_snowflake( job, &client, - &inner_content, + &code, db, mem_peak, canceled_by, @@ -2480,7 +2464,7 @@ async fn handle_code_execution_job( return do_mssql( job, &client, - &inner_content, + &code, db, mem_peak, canceled_by, @@ -2493,7 +2477,7 @@ async fn handle_code_execution_job( return do_graphql( job, &client, - &inner_content, + &code, db, mem_peak, canceled_by, @@ -2524,7 +2508,7 @@ async fn handle_code_execution_job( job, &client, env_code, - inner_content, + code.clone(), db, mem_peak, canceled_by, @@ -2570,7 +2554,7 @@ mount {{ // println!("handle lang job {:?}", SystemTime::now()); - let envs = build_envs(envs)?; + let envs = build_envs(envs.as_ref())?; let result: error::Result> = match language { None => { @@ -2586,7 +2570,7 @@ mount {{ #[cfg(feature = "python")] handle_python_job( - requirements_o, + lock.as_ref(), job_dir, worker_dir, worker_name, @@ -2595,7 +2579,7 @@ mount {{ canceled_by, db, client, - &inner_content, + &code, &shared_mount, base_internal_url, envs, @@ -2606,14 +2590,14 @@ mount {{ } Some(ScriptLang::Deno) => { handle_deno_job( - requirements_o, + lock.as_ref(), mem_peak, canceled_by, job, db, client, job_dir, - &inner_content, + &code, base_internal_url, worker_name, envs, @@ -2624,15 +2608,15 @@ mount {{ } Some(ScriptLang::Bun) | Some(ScriptLang::Bunnative) => { handle_bun_job( - requirements_o, - codebase, + lock.as_ref(), + codebase.as_ref(), mem_peak, canceled_by, job, db, client, job_dir, - &inner_content, + &code, base_internal_url, worker_name, envs, @@ -2649,9 +2633,9 @@ mount {{ job, db, client, - &inner_content, + &code, job_dir, - requirements_o, + lock.as_ref(), &shared_mount, base_internal_url, worker_name, @@ -2667,7 +2651,7 @@ mount {{ job, db, client, - &inner_content, + &code, job_dir, &shared_mount, base_internal_url, @@ -2685,7 +2669,7 @@ mount {{ job, db, client, - &inner_content, + &code, job_dir, &shared_mount, base_internal_url, @@ -2703,14 +2687,14 @@ mount {{ #[cfg(feature = "php")] handle_php_job( - requirements_o, + lock.as_ref(), mem_peak, canceled_by, job, db, client, job_dir, - &inner_content, + &code, base_internal_url, worker_name, envs, @@ -2732,9 +2716,9 @@ mount {{ job, db, client, - &inner_content, + &code, job_dir, - requirements_o, + lock.as_ref(), &shared_mount, base_internal_url, worker_name, @@ -2751,7 +2735,7 @@ mount {{ #[cfg(feature = "python")] handle_ansible_job( - requirements_o, + lock.as_ref(), job_dir, worker_dir, worker_name, @@ -2760,7 +2744,7 @@ mount {{ canceled_by, db, client, - &inner_content, + &code, &shared_mount, base_internal_url, envs, @@ -2775,9 +2759,9 @@ mount {{ job, db, client, - &inner_content, + &code, job_dir, - requirements_o, + lock.as_ref(), &shared_mount, base_internal_url, worker_name, diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 17c1e58859dc1..e8cd67108bba8 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -18,6 +18,7 @@ use crate::{ KEEP_JOB_DIR, }; use anyhow::Context; +use futures::TryFutureExt; use mappable_rc::Marc; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; @@ -31,15 +32,17 @@ use windmill_common::add_time; use windmill_common::auth::JobPerms; #[cfg(feature = "benchmark")] use windmill_common::bench::BenchmarkIter; +use windmill_common::cache::{self, RawData}; use windmill_common::db::Authed; use windmill_common::flow_status::{ ApprovalConditions, FlowStatusModuleWParent, Iterator, JobResult, }; use windmill_common::flows::{add_virtual_items_if_necessary, Branch, FlowNodeId}; use windmill_common::jobs::{ - script_hash_to_tag_and_limits, script_path_to_payload, BranchResults, JobPayload, QueuedJob, - RawCode, ENTRYPOINT_OVERRIDE, + script_hash_to_tag_and_limits, script_path_to_payload, BranchResults, JobKind, JobPayload, + QueuedJob, RawCode, ENTRYPOINT_OVERRIDE, }; +use windmill_common::scripts::ScriptHash; use windmill_common::utils::WarnAfterExt; use windmill_common::worker::to_raw_value; use windmill_common::{ @@ -53,7 +56,7 @@ use windmill_common::{ use windmill_queue::schedule::get_schedule_opt; use windmill_queue::{ add_completed_job, add_completed_job_error, append_logs, handle_maybe_scheduled_job, - CanceledBy, PulledJob, PushArgs, PushIsolationLevel, WrappedError, + CanceledBy, PushArgs, PushIsolationLevel, WrappedError, }; type DB = sqlx::Pool; @@ -205,8 +208,7 @@ pub async fn update_flow_status_after_job_completion_internal( let ( should_continue_flow, flow_job, - flow_value, - raw_flow, + flow_data, stop_early, skip_if_stop_early, nresult, @@ -215,31 +217,46 @@ pub async fn update_flow_status_after_job_completion_internal( ) = { // tracing::debug!("UPDATE FLOW STATUS: {flow:?} {success} {result:?} {w_id} {depth}"); - let (old_status, current_module) = sqlx::query!( + let (job_kind, script_hash, old_status, raw_flow) = sqlx::query!( "SELECT + job_kind AS \"job_kind: JobKind\", + script_hash AS \"script_hash: ScriptHash\", flow_status AS \"flow_status!: Json>\", - coalesce(job.raw_flow, queue.raw_flow)->'modules'->(flow_status->'step')::int AS \"module: Json>\" - FROM queue LEFT JOIN job USING(id, workspace_id) WHERE id = $1 AND workspace_id = $2 LIMIT 1", - flow, w_id + raw_flow AS \"raw_flow: Json>\" + FROM queue WHERE id = $1 AND workspace_id = $2 LIMIT 1", + flow, + w_id ) .fetch_one(db) .await - .map_err(|e| Error::InternalErr( - format!("fetching flow status {flow} while reporting {success} {result:?}: {e:#}") - )) - .and_then(|record| Ok(( - serde_json::from_str::(record.flow_status.0.get()).map_err(|e| Error::InternalErr( - format!("requiring current module to be parsable as FlowStatus: {e:?}") - ))?, - record.module.map(|json| { - serde_json::from_str::(json.0.get()).map_err(|e| Error::InternalErr(format!( - "requiring current module to be parsable as FlowModule: {e:?}" - ))) - }).transpose()?, - )))?; + .map_err(|e| { + Error::InternalErr(format!( + "fetching flow status {flow} while reporting {success} {result:?}: {e:#}" + )) + }) + .and_then(|record| { + Ok(( + record.job_kind, + record.script_hash, + serde_json::from_str::(record.flow_status.0.get()).map_err(|e| { + Error::InternalErr(format!( + "requiring current module to be parsable as FlowStatus: {e:?}" + )) + })?, + record.raw_flow, + )) + })?; - let module_step = Step::from_i32_and_len(old_status.step, old_status.modules.len()); + let flow_data = cache::job::fetch_flow(db, job_kind, script_hash) + .or_else(|_| cache::job::fetch_preview_flow(db, &flow, raw_flow)) + .await?; + let flow_value = flow_data.value()?; + let module_step = Step::from_i32_and_len(old_status.step, old_status.modules.len()); + let current_module = match module_step { + Step::Step(i) => flow_value.modules.get(i), + _ => None, + }; let module_status = match module_step { Step::PreprocessorStep => old_status .preprocessor_module @@ -385,7 +402,7 @@ pub async fn update_flow_status_after_job_completion_internal( *branch, *parallel, db, - current_module.as_ref(), + current_module, ) .await? .unwrap_or(false), @@ -919,7 +936,7 @@ pub async fn update_flow_status_after_job_completion_internal( .context("remove flow status retry")?; } - let flow_job = sqlx::query_as::<_, PulledJob>( + let flow_job = sqlx::query_as::<_, QueuedJob>( "SELECT * FROM queue WHERE id = $1 AND workspace_id = $2", ) .bind(flow) @@ -936,24 +953,6 @@ pub async fn update_flow_status_after_job_completion_internal( .unwrap_or_else(|| "none".to_string()); tracing::info!(id = %flow_job.id, root_id = %job_root, "update flow status"); - let PulledJob { job: flow_job, raw_flow, .. } = flow_job; - let raw_flow = if let Some(raw_flow) = raw_flow { - Some(raw_flow) - } else { - sqlx::query_scalar!( - "SELECT raw_flow AS \"raw_flow!: Json>\" - FROM job WHERE id = $1 AND workspace_id = $2 LIMIT 1", - &flow_job.id, - w_id - ) - .fetch_one(db) - .await - .ok() - }; - let flow_value = raw_flow - .as_ref() - .and_then(|raw_flow| serde_json::from_str::(raw_flow.get()).ok()); - let should_continue_flow = match success { _ if stop_early => false, _ if flow_job.canceled => false, @@ -964,21 +963,22 @@ pub async fn update_flow_status_after_job_completion_internal( } false if next_retry( - flow_value - .as_ref() - .and_then(|value| match module_step { - Step::PreprocessorStep => value - .preprocessor_module - .as_ref() - .and_then(|m| m.retry.as_ref()), - Step::Step(i) => { - value.modules.get(i).as_ref().and_then(|m| m.retry.as_ref()) - } - Step::FailureStep => { - value.failure_module.as_ref().and_then(|m| m.retry.as_ref()) - } - }) - .unwrap_or(&Retry::default()), + match module_step { + Step::PreprocessorStep => flow_value + .preprocessor_module + .as_ref() + .and_then(|m| m.retry.as_ref()), + Step::Step(i) => flow_value + .modules + .get(i) + .as_ref() + .and_then(|m| m.retry.as_ref()), + Step::FailureStep => flow_value + .failure_module + .as_ref() + .and_then(|m| m.retry.as_ref()), + } + .unwrap_or(&Retry::default()), &old_status.retry, ) .is_some() => @@ -988,10 +988,7 @@ pub async fn update_flow_status_after_job_completion_internal( false if !is_failure_step && !skip_error_handler - && flow_value - .as_ref() - .map(|v| v.failure_module.is_some()) - .unwrap_or(false) => + && flow_value.failure_module.is_some() => { true } @@ -1003,8 +1000,7 @@ pub async fn update_flow_status_after_job_completion_internal( ( should_continue_flow, flow_job, - flow_value, - raw_flow, + flow_data, stop_early, skip_if_stop_early, nresult, @@ -1066,8 +1062,8 @@ pub async fn update_flow_status_after_job_completion_internal( .await?; } else { if flow_job.cache_ttl.is_some() && success { - let cached_res_path = - cached_result_path(db, client, &flow_job, None, None, raw_flow.as_ref()).await; + let flow = RawData::Flow(flow_data.clone()); + let cached_res_path = cached_result_path(db, client, &flow_job, Some(&flow)).await; save_in_cache(db, client, &flow_job, cached_res_path, nresult.clone()).await; } @@ -1117,7 +1113,7 @@ pub async fn update_flow_status_after_job_completion_internal( tracing::debug!(id = %flow_job.id, "start handle flow"); match handle_flow( flow_job.clone(), - flow_value, + &flow_data, db, client, Some(nresult.clone()), @@ -1495,7 +1491,7 @@ async fn transform_input( #[instrument(level = "trace", skip_all)] pub async fn handle_flow( flow_job: Arc, - flow_value: Option, + flow_data: &cache::FlowData, db: &sqlx::Pool, client: &AuthedClient, last_result: Option>>, @@ -1503,7 +1499,7 @@ pub async fn handle_flow( worker_dir: &str, job_completed_tx: Sender, ) -> anyhow::Result<()> { - let flow = flow_value.with_context(|| "Unable to parse flow definition")?; + let flow = flow_data.value()?; let status = flow_job .parse_flow_status() .with_context(|| "Unable to parse flow status")?; @@ -1614,7 +1610,7 @@ lazy_static::lazy_static! { async fn push_next_flow_job( flow_job: Arc, mut status: FlowStatus, - flow: FlowValue, + flow: &FlowValue, db: &sqlx::Pool, client: &AuthedClient, last_job_result: Option>>, @@ -1950,8 +1946,8 @@ async fn push_next_flow_job( ) && is_disapproved.is_none() { sqlx::query( - "UPDATE queue SET - flow_status = JSONB_SET(flow_status, ARRAY['modules', flow_status->>'step'::text], $1), + "UPDATE queue SET + flow_status = JSONB_SET(flow_status, ARRAY['modules', flow_status->>'step'::text], $1), suspend = $2, suspend_until = now() + $3 WHERE id = $4", @@ -2349,8 +2345,13 @@ async fn push_next_flow_job( } }; + // Also check `flow_job.same_worker` for [`JobKind::Flow`] jobs as it's no + // more reflected to the flow value on push. + let job_same_worker = flow_job.same_worker + && matches!(flow_job.job_kind, JobKind::Flow) + && flow_job.script_hash.is_some(); let continue_on_same_worker = - flow.same_worker && module.suspend.is_none() && module.sleep.is_none(); + (flow.same_worker || job_same_worker) && module.suspend.is_none() && module.sleep.is_none(); /* Finally, push the job into the queue */ let mut uuids = vec![]; diff --git a/backend/windmill-worker/src/worker_lockfiles.rs b/backend/windmill-worker/src/worker_lockfiles.rs index dfb855b905711..38611e2b0b4d2 100644 --- a/backend/windmill-worker/src/worker_lockfiles.rs +++ b/backend/windmill-worker/src/worker_lockfiles.rs @@ -19,6 +19,7 @@ use windmill_common::worker::{to_raw_value, to_raw_value_owned, write_file}; use windmill_common::{ apps::AppScriptId, + cache::{self, RawData}, error::{self, to_anyhow}, flows::{add_virtual_items_if_necessary, FlowValue}, jobs::QueuedJob, @@ -217,7 +218,7 @@ pub fn extract_relative_imports( #[tracing::instrument(level = "trace", skip_all)] pub async fn handle_dependency_job( job: &QueuedJob, - raw_code: Option, + preview_data: Option<&RawData>, mem_peak: &mut i32, canceled_by: &mut Option, job_dir: &str, @@ -228,18 +229,6 @@ pub async fn handle_dependency_job( token: &str, occupancy_metrics: &mut OccupancyMetrics, ) -> error::Result> { - let raw_code = match raw_code { - Some(code) => code, - None => sqlx::query_scalar!( - "SELECT content FROM script WHERE hash = $1 AND workspace_id = $2", - &job.script_hash.unwrap_or(ScriptHash(0)).0, - &job.workspace_id - ) - .fetch_optional(db) - .await? - .unwrap_or_else(|| "No script found at this hash".to_string()), - }; - let script_path = job.script_path(); let raw_deps = job .args @@ -268,6 +257,16 @@ pub async fn handle_dependency_job( None }; + // `JobKind::Dependencies` job store either: + // - A saved script `hash` in the `script_hash` column. + // - Preview raw lock and code in the `queue` or `job` table. + let script_data = match job.script_hash { + Some(hash) => &cache::script::fetch(db, hash).await?.0, + _ => match preview_data { + Some(RawData::Script(data)) => data, + _ => return Err(Error::InternalErr("expected script hash".into())), + }, + }; let content = capture_dependency_job( &job.id, job.language.as_ref().map(|v| Ok(v)).unwrap_or_else(|| { @@ -275,7 +274,7 @@ pub async fn handle_dependency_job( "Job Language required for dependency jobs".to_owned(), )) })?, - &raw_code, + &script_data.code, mem_peak, canceled_by, job_dir, @@ -312,6 +311,9 @@ pub async fn handle_dependency_job( .execute(db) .await?; + // `lock` has been updated; invalidate the cache. + cache::script::invalidate(hash); + let (deployment_message, parent_path) = get_deployment_msg_and_parent_path_from_args(job.args.clone()); @@ -333,7 +335,8 @@ pub async fn handle_dependency_job( tracing::error!(%e, "error handling deployment metadata"); } - let relative_imports = extract_relative_imports(&raw_code, script_path, &job.language); + let relative_imports = + extract_relative_imports(&script_data.code, script_path, &job.language); if let Some(relative_imports) = relative_imports { update_script_dependency_map( &job.id, @@ -534,7 +537,7 @@ async fn trigger_dependents_to_recompute_dependencies( pub async fn handle_flow_dependency_job( job: &QueuedJob, - raw_flow: Option>>, + preview_data: Option<&RawData>, mem_peak: &mut i32, canceled_by: &mut Option, job_dir: &str, @@ -577,11 +580,6 @@ pub async fn handle_flow_dependency_job( ) }; - let raw_flow = raw_flow.map(|v| Ok(v)).unwrap_or_else(|| { - Err(Error::InternalErr( - "Flow Dependency requires raw flow".to_owned(), - )) - })?; let (deployment_message, parent_path) = get_deployment_msg_and_parent_path_from_args(job.args.clone()); @@ -595,7 +593,18 @@ pub async fn handle_flow_dependency_job( }) .flatten(); - let mut flow = serde_json::from_str::((*raw_flow.0).get()).map_err(to_anyhow)?; + // `JobKind::FlowDependencies` job store either: + // - A saved flow version `id` in the `script_hash` column. + // - Preview raw flow in the `queue` or `job` table. + let mut flow = match job.script_hash { + Some(ScriptHash(id)) => cache::flow::fetch_version(db, id).await?, + _ => match preview_data { + Some(RawData::Flow(data)) => data.clone(), + _ => return Err(Error::InternalErr("expected script hash".into())), + }, + } + .value()? + .clone(); let mut tx = db.begin().await?; @@ -619,16 +628,15 @@ pub async fn handle_flow_dependency_job( occupancy_metrics, ) .await?; - let new_flow_value = - sqlx::types::Json(serde_json::value::to_raw_value(&flow).map_err(to_anyhow)?); + let new_flow_value = Json(serde_json::value::to_raw_value(&flow).map_err(to_anyhow)?); - // Re-check cancelation to ensure we don't accidentially override a flow. + // Re-check cancellation to ensure we don't accidentally override a flow. if sqlx::query_scalar!("SELECT canceled FROM queue WHERE id = $1", job.id) .fetch_optional(db) .await .map(|v| Some(true) == v) .unwrap_or_else(|err| { - tracing::error!(%job.id, %err, "error checking cancelation for job {0}: {err}", job.id); + tracing::error!(%job.id, %err, "error checking cancellation for job {0}: {err}", job.id); false }) { @@ -644,18 +652,18 @@ pub async fn handle_flow_dependency_job( sqlx::query!( "UPDATE flow SET value = $1 WHERE path = $2 AND workspace_id = $3", - &new_flow_value as &sqlx::types::Json>, + &new_flow_value as &Json>, job_path, job.workspace_id ) - .execute(db) + .execute(&mut *tx) .await?; sqlx::query!( "UPDATE flow_version SET value = $1 WHERE id = $2", - &new_flow_value as &sqlx::types::Json>, + &new_flow_value as &Json>, version ) - .execute(db) + .execute(&mut *tx) .await?; // Compute a lite version of the flow value (`RawScript` => `FlowScript`). @@ -673,9 +681,9 @@ pub async fn handle_flow_dependency_job( "INSERT INTO flow_version_lite (id, value) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value", version, - sqlx::types::Json(to_raw_value(&value_lite)) as sqlx::types::Json>, + Json(value_lite) as Json, ) - .execute(db) + .execute(&mut *tx) .await?; tx.commit().await?; @@ -1831,7 +1839,7 @@ async fn capture_dependency_job( if req.is_some() && !raw_deps { crate::bun_executor::prebundle_bun_script( job_raw_code, - req.clone(), + req.as_ref(), script_path, job_id, w_id,