Skip to content

Commit

Permalink
fix: fix relative imports in cached flow scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Dec 18, 2024
1 parent ee28955 commit 13be0cd
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
2 changes: 2 additions & 0 deletions backend/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3848,6 +3848,7 @@ mod job_payload {
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
path: "f/system/hello/test-0".into(),
})
.arg("world", json!("foo"))
.run_until_complete(&db, port)
Expand All @@ -3867,6 +3868,7 @@ mod job_payload {
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
path: "f/system/hello/test-0".into(),
})
.arg("hello", json!("You know nothing Jean Neige"))
.run_until_complete(&db, port)
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-common/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ pub enum JobPayload {
concurrency_time_window_s: Option<i32>,
cache_ttl: Option<i32>,
dedicated_worker: Option<bool>,
path: String,
},
FlowNode {
id: FlowNodeId, // flow_node(id).
Expand Down
3 changes: 2 additions & 1 deletion backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2852,9 +2852,10 @@ pub async fn push<'c, 'd>(
concurrency_time_window_s,
cache_ttl,
dedicated_worker,
path,
} => (
Some(id.0),
None,
Some(path),
None,
JobKind::FlowScript,
None,
Expand Down
33 changes: 21 additions & 12 deletions backend/windmill-worker/src/worker_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2978,6 +2978,18 @@ fn payload_from_modules<'a>(
})
}

fn get_path(flow_job: &QueuedJob, status: &FlowStatus, module: &FlowModule) -> String {
if status
.preprocessor_module
.as_ref()
.is_some_and(|x| x.id() == module.id)
{
format!("{}/preprocessor", flow_job.script_path())
} else {
format!("{}/step-{}", flow_job.script_path(), status.step)
}
}

async fn compute_next_flow_transform(
arc_flow_job_args: Marc<HashMap<String, Box<RawValue>>>,
arc_last_job_result: Arc<Box<RawValue>>,
Expand Down Expand Up @@ -3023,6 +3035,7 @@ async fn compute_next_flow_transform(
if is_skipped {
return trivial_next_job(JobPayload::Identity);
}

match module.get_value()? {
FlowModuleValue::Identity => trivial_next_job(JobPayload::Identity),
FlowModuleValue::Flow { path, .. } => {
Expand Down Expand Up @@ -3052,19 +3065,10 @@ async fn compute_next_flow_transform(
concurrency_time_window_s,
..
} => {
let path = path.clone().or_else(|| {
if status
.preprocessor_module
.as_ref()
.is_some_and(|x| x.id() == module.id)
{
Some(format!("{}/preprocessor", flow_job.script_path()))
} else {
Some(format!("{}/step-{}", flow_job.script_path(), status.step))
}
});
let path = path.unwrap_or_else(|| get_path(flow_job, status, module));

let payload = raw_script_to_payload(
path,
Some(path),
content,
language,
lock,
Expand All @@ -3089,6 +3093,8 @@ async fn compute_next_flow_transform(
concurrency_time_window_s,
..
} => {
let path = get_path(flow_job, status, module);

let payload = JobPayloadWithTag {
payload: JobPayload::FlowScript {
id,
Expand All @@ -3098,6 +3104,7 @@ async fn compute_next_flow_transform(
concurrency_time_window_s,
cache_ttl: module.cache_ttl.map(|x| x as i32),
dedicated_worker: None,
path,
},
tag: tag.clone(),
delete_after_use,
Expand Down Expand Up @@ -3699,6 +3706,8 @@ async fn payload_from_simple_module(
concurrency_time_window_s,
cache_ttl: module.cache_ttl.map(|x| x as i32),
dedicated_worker: None,
path: inner_path
.unwrap_or_else(|| format!("{}/simple-flow", flow_job.script_path())),
},
tag,
delete_after_use,
Expand Down

0 comments on commit 13be0cd

Please sign in to comment.