Skip to content

Commit

Permalink
remove rsmq
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Nov 24, 2024
1 parent 278f593 commit 89456f5
Show file tree
Hide file tree
Showing 40 changed files with 338 additions and 1,303 deletions.
305 changes: 13 additions & 292 deletions backend/Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ chrono.workspace = true
git-version.workspace = true
base64.workspace = true
sha2.workspace = true
rsmq_async.workspace = true
url.workspace = true
lazy_static.workspace = true
once_cell.workspace = true
Expand Down Expand Up @@ -231,7 +230,6 @@ async-stripe = { version = "0.39.1", features = [
] }
async_zip = { version = "0.0.11", features = ["full"] }
once_cell = "1.17.1"
rsmq_async = { version = "5.1.5" }
gosyn = "0.2.6"
bytes = "1.4.0"
gethostname = "0.4.3"
Expand Down
2 changes: 1 addition & 1 deletion backend/ee-repo-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4a0e64ab860eb88ff0d21b8705e647c9b76c46ee
446e4fbc59048bb11c18648ac094e57c0bfa4a28
54 changes: 13 additions & 41 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

use anyhow::Context;
use monitor::{
reload_indexer_config, reload_timeout_wait_result_setting, send_current_log_file_to_object_store, send_logs_to_object_store
reload_indexer_config, reload_timeout_wait_result_setting,
send_current_log_file_to_object_store, send_logs_to_object_store,
};
use rand::Rng;
use sqlx::{postgres::PgListener, Pool, Postgres};
Expand All @@ -29,7 +30,15 @@ use windmill_common::ee::{maybe_renew_license_key_on_start, LICENSE_KEY_ID, LICE

use windmill_common::{
global_settings::{
BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING,CRITICAL_ALERT_MUTE_UI_SETTING, CRITICAL_ERROR_CHANNELS_SETTING, CUSTOM_TAGS_SETTING, DEFAULT_TAGS_PER_WORKSPACE_SETTING, DEFAULT_TAGS_WORKSPACES_SETTING, ENV_SETTINGS, EXPOSE_DEBUG_METRICS_SETTING, EXPOSE_METRICS_SETTING, EXTRA_PIP_INDEX_URL_SETTING, HUB_BASE_URL_SETTING, INDEXER_SETTING, JOB_DEFAULT_TIMEOUT_SECS_SETTING, JWT_SECRET_SETTING, KEEP_JOB_DIR_SETTING, LICENSE_KEY_SETTING, NPM_CONFIG_REGISTRY_SETTING, OAUTH_SETTING, PIP_INDEX_URL_SETTING, REQUEST_SIZE_LIMIT_SETTING, REQUIRE_PREEXISTING_USER_FOR_OAUTH_SETTING, RETENTION_PERIOD_SECS_SETTING, SAML_METADATA_SETTING, SCIM_TOKEN_SETTING, SMTP_SETTING, TIMEOUT_WAIT_RESULT_SETTING
BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CRITICAL_ALERT_MUTE_UI_SETTING,
CRITICAL_ERROR_CHANNELS_SETTING, CUSTOM_TAGS_SETTING, DEFAULT_TAGS_PER_WORKSPACE_SETTING,
DEFAULT_TAGS_WORKSPACES_SETTING, ENV_SETTINGS, EXPOSE_DEBUG_METRICS_SETTING,
EXPOSE_METRICS_SETTING, EXTRA_PIP_INDEX_URL_SETTING, HUB_BASE_URL_SETTING, INDEXER_SETTING,
JOB_DEFAULT_TIMEOUT_SECS_SETTING, JWT_SECRET_SETTING, KEEP_JOB_DIR_SETTING,
LICENSE_KEY_SETTING, NPM_CONFIG_REGISTRY_SETTING, OAUTH_SETTING, PIP_INDEX_URL_SETTING,
REQUEST_SIZE_LIMIT_SETTING, REQUIRE_PREEXISTING_USER_FOR_OAUTH_SETTING,
RETENTION_PERIOD_SECS_SETTING, SAML_METADATA_SETTING, SCIM_TOKEN_SETTING, SMTP_SETTING,
TIMEOUT_WAIT_RESULT_SETTING,
},
scripts::ScriptLang,
stats_ee::schedule_stats,
Expand Down Expand Up @@ -332,27 +341,6 @@ async fn windmill_main() -> anyhow::Result<()> {
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
};

let rsmq_config = std::env::var("REDIS_URL").ok().map(|x| {
let url = x.parse::<url::Url>().unwrap();
let mut config = rsmq_async::RsmqOptions { ..Default::default() };

config.host = url.host_str().expect("redis host required").to_owned();
config.password = url.password().map(|s| s.to_owned());
config.db = url
.path_segments()
.and_then(|mut segments| segments.next())
.and_then(|segment| segment.parse().ok())
.unwrap_or(0);
config.ns = url
.query_pairs()
.find(|s| s.0 == "rsmq_namespace")
.map(|s| s.1)
.unwrap_or(std::borrow::Cow::Borrowed("rsmq"))
.into_owned();
config.port = url.port().unwrap_or(6379).to_string();
config
});

tracing::info!("Connecting to database...");
let db = windmill_common::connect_db(server_mode, indexer_mode).await?;
tracing::info!("Database connected");
Expand All @@ -367,13 +355,6 @@ async fn windmill_main() -> anyhow::Result<()> {
.unwrap_or_else(|| "UNKNOWN".to_string())
);

let rsmq = if let Some(config) = rsmq_config {
tracing::info!("Redis config set: {:?}", config);
Some(rsmq_async::MultiplexedRsmq::new(config).await.unwrap())
} else {
None
};

let is_agent = mode == Mode::Agent;

if !is_agent {
Expand Down Expand Up @@ -488,7 +469,6 @@ Windmill Community Edition {GIT_VERSION}
monitor_db(
&db,
&base_internal_url,
rsmq.clone(),
server_mode,
worker_mode,
true,
Expand All @@ -507,7 +487,6 @@ Windmill Community Edition {GIT_VERSION}

let addr = SocketAddr::from((server_bind_address, port));

let rsmq2 = rsmq.clone();
let (base_internal_tx, base_internal_rx) = tokio::sync::oneshot::channel::<String>();

DirBuilder::new()
Expand Down Expand Up @@ -588,7 +567,6 @@ Windmill Community Edition {GIT_VERSION}
if !is_agent {
windmill_api::run_server(
db.clone(),
rsmq2,
index_reader,
log_index_reader,
addr,
Expand Down Expand Up @@ -620,7 +598,6 @@ Windmill Community Edition {GIT_VERSION}
killpill_tx.clone(),
num_workers,
base_internal_url.clone(),
rsmq.clone(),
mode.clone() == Mode::Agent,
hostname.clone(),
)
Expand All @@ -636,13 +613,12 @@ Windmill Community Edition {GIT_VERSION}
killpill_phase2_tx.send(())?;
tracing::info!("Phase 2 of shutdown completed");
}
Ok(()) as anyhow::Result<()>
Ok(())
};

let monitor_f = async {
let db = db.clone();
let tx = killpill_tx.clone();
let rsmq = rsmq.clone();

let base_internal_url = base_internal_url.to_string();
let h = tokio::spawn(async move {
Expand All @@ -659,7 +635,6 @@ Windmill Community Edition {GIT_VERSION}
monitor_db(
&db,
&base_internal_url,
rsmq.clone(),
server_mode,
worker_mode,
false,
Expand Down Expand Up @@ -950,13 +925,12 @@ fn display_config(envs: &[&str]) {
)
}

pub async fn run_workers<R: rsmq_async::RsmqConnection + Send + Sync + Clone + 'static>(
pub async fn run_workers(
db: Pool<Postgres>,
mut rx: tokio::sync::broadcast::Receiver<()>,
tx: tokio::sync::broadcast::Sender<()>,
num_workers: i32,
base_internal_url: String,
rsmq: Option<R>,
agent_mode: bool,
hostname: String,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -1028,7 +1002,6 @@ pub async fn run_workers<R: rsmq_async::RsmqConnection + Send + Sync + Clone + '
let rx = killpill_rxs.pop().unwrap();
let tx = tx.clone();
let base_internal_url = base_internal_url.clone();
let rsmq2 = rsmq.clone();
let hostname = hostname.clone();

handles.push(tokio::spawn(async move {
Expand All @@ -1046,7 +1019,6 @@ pub async fn run_workers<R: rsmq_async::RsmqConnection + Send + Sync + Clone + '
rx,
tx,
&base_internal_url,
rsmq2,
agent_mode,
);

Expand Down
35 changes: 10 additions & 25 deletions backend/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::{
};

use chrono::{NaiveDateTime, Utc};
use rsmq_async::MultiplexedRsmq;
use serde::de::DeserializeOwned;
use sqlx::{Pool, Postgres};
use tokio::{
Expand Down Expand Up @@ -686,12 +685,10 @@ pub async fn delete_expired_items(db: &DB) -> () {
{
tracing::error!("Error deleting log file: {:?}", e);
}
if let Err(e) = sqlx::query!(
"DELETE FROM job WHERE id = ANY($1)",
&deleted_jobs
)
.execute(&mut *tx)
.await
if let Err(e) =
sqlx::query!("DELETE FROM job WHERE id = ANY($1)", &deleted_jobs)
.execute(&mut *tx)
.await
{
tracing::error!("Error deleting job: {:?}", e);
}
Expand Down Expand Up @@ -1031,16 +1028,15 @@ pub async fn monitor_pool(db: &DB) {
pub async fn monitor_db(
db: &Pool<Postgres>,
base_internal_url: &str,
rsmq: Option<MultiplexedRsmq>,
server_mode: bool,
_worker_mode: bool,
initial_load: bool,
_killpill_tx: tokio::sync::broadcast::Sender<()>,
) {
let zombie_jobs_f = async {
if server_mode && !initial_load {
handle_zombie_jobs(db, base_internal_url, rsmq.clone(), "server").await;
match handle_zombie_flows(db, rsmq.clone()).await {
handle_zombie_jobs(db, base_internal_url, "server").await;
match handle_zombie_flows(db).await {
Err(err) => {
tracing::error!("Error handling zombie flows: {:?}", err);
}
Expand Down Expand Up @@ -1312,12 +1308,7 @@ pub async fn reload_base_url_setting(db: &DB) -> error::Result<()> {
Ok(())
}

async fn handle_zombie_jobs<R: rsmq_async::RsmqConnection + Send + Sync + Clone>(
db: &Pool<Postgres>,
base_internal_url: &str,
rsmq: Option<R>,
worker_name: &str,
) {
async fn handle_zombie_jobs(db: &Pool<Postgres>, base_internal_url: &str, worker_name: &str) {
if *RESTART_ZOMBIE_JOBS {
let restarted = sqlx::query!(
"UPDATE queue SET running = false, started_at = null
Expand Down Expand Up @@ -1426,7 +1417,6 @@ async fn handle_zombie_jobs<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
true,
same_worker_tx_never_used,
"",
rsmq.clone(),
worker_name,
send_result_never_used,
#[cfg(feature = "benchmark")]
Expand All @@ -1436,10 +1426,7 @@ async fn handle_zombie_jobs<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
}
}

async fn handle_zombie_flows(
db: &DB,
rsmq: Option<rsmq_async::MultiplexedRsmq>,
) -> error::Result<()> {
async fn handle_zombie_flows(db: &DB) -> error::Result<()> {
let flows = sqlx::query_as::<_, QueuedJob>(
r#"
SELECT *
Expand Down Expand Up @@ -1486,7 +1473,7 @@ async fn handle_zombie_flows(
}
);
report_critical_error(reason.clone(), db.clone(), Some(&flow.workspace_id), None).await;
cancel_zombie_flow_job(db, flow, &rsmq, reason).await?;
cancel_zombie_flow_job(db, flow, reason).await?;
}
}

Expand Down Expand Up @@ -1516,7 +1503,7 @@ async fn handle_zombie_flows(
job.workspace_id,
flow.last_ping
);
cancel_zombie_flow_job(db, job, &rsmq,
cancel_zombie_flow_job(db, job,
format!("Flow {} cancelled as one of the parallel branch {} was unable to make the last transition ", flow.parent_flow_id, flow.job_id))
.await?;
} else {
Expand All @@ -1529,7 +1516,6 @@ async fn handle_zombie_flows(
async fn cancel_zombie_flow_job(
db: &Pool<Postgres>,
flow: QueuedJob,
rsmq: &Option<MultiplexedRsmq>,
message: String,
) -> Result<(), error::Error> {
let tx = db.begin().await.unwrap();
Expand All @@ -1545,7 +1531,6 @@ async fn cancel_zombie_flow_job(
flow.workspace_id.as_str(),
tx,
db,
rsmq.clone(),
true,
false,
)
Expand Down
8 changes: 3 additions & 5 deletions backend/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ impl ApiServer {
db.clone(),
None,
None,
None,
addr,
rx,
port_tx,
Expand Down Expand Up @@ -899,8 +898,8 @@ impl RunJob {
hm_args.insert(k, windmill_common::worker::to_raw_value(&v));
}

let tx = PushIsolationLevel::IsolatedRoot(db.clone(), None);
let (uuid, tx) = windmill_queue::push::<rsmq_async::MultiplexedRsmq>(
let tx = PushIsolationLevel::IsolatedRoot(db.clone());
let (uuid, tx) = windmill_queue::push(
&db,
tx,
"test-workspace",
Expand Down Expand Up @@ -1018,7 +1017,7 @@ fn spawn_test_worker(
windmill_common::worker::make_suspended_pull_query(&wc).await;
windmill_common::worker::make_pull_query(&wc).await;
}
windmill_worker::run_worker::<rsmq_async::MultiplexedRsmq>(
windmill_worker::run_worker(
&db,
worker_instance,
worker_name,
Expand All @@ -1028,7 +1027,6 @@ fn spawn_test_worker(
rx,
tx2,
&base_internal_url,
None,
false,
)
.await
Expand Down
1 change: 0 additions & 1 deletion backend/windmill-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ async-stripe = { workspace = true, optional = true }
lazy_static.workspace = true
prometheus = { workspace = true, optional = true }
async_zip.workspace = true
rsmq_async.workspace = true
regex.workspace = true
bytes.workspace = true
samael = { workspace = true, optional = true }
Expand Down
Loading

0 comments on commit 89456f5

Please sign in to comment.