From 5a4cd103e09d112fc14283e5dc9942fa0461f218 Mon Sep 17 00:00:00 2001 From: Jeremy Andrews Date: Sun, 17 Jul 2022 07:45:08 +0200 Subject: [PATCH] move manager into thread; gaggle from controller --- src/config.rs | 53 +++++--- src/controller.rs | 302 ++++++++++++++++++++++++++++++++++++++---- src/gaggle/common.rs | 23 ++++ src/gaggle/manager.rs | 71 ++++++++++ src/lib.rs | 37 +++--- 5 files changed, 420 insertions(+), 66 deletions(-) create mode 100644 src/gaggle/common.rs create mode 100644 src/gaggle/manager.rs diff --git a/src/config.rs b/src/config.rs index 8b0d2e7d..d0fdef48 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,7 +17,10 @@ use crate::util; use crate::{GooseAttack, GooseError}; /// Constant defining Goose's default port when running a Gaggle. -const DEFAULT_PORT: &str = "5115"; +const DEFAULT_GAGGLE_PORT: &str = "5115"; + +/// Constant defining Goose's default manager_host when running a Gaggle. +const DEFAULT_GAGGLE_HOST: &str = "127.0.0.1"; /// Runtime options available when launching a Goose load test. /// @@ -1846,7 +1849,30 @@ impl GooseConfiguration { }, ]) .unwrap_or(false); + } + pub(crate) fn configure_gaggle(&mut self, defaults: &GooseDefaults) { + // Re-configure `users`, in case the AttackMode was changed. + self.users = self.get_value(vec![ + // Use --users if set and not on Worker. + GooseValue { + value: self.users, + filter: self.worker, + message: "--users", + }, + // Otherwise use GooseDefault if set and not on Worker. + GooseValue { + value: defaults.users, + filter: defaults.users.is_none() || self.worker, + message: "default users", + }, + // Otherwise use detected number of CPUs if not on Worker. + GooseValue { + value: Some(num_cpus::get()), + filter: self.worker || self.test_plan.is_some(), + message: "users defaulted to number of CPUs", + }, + ]); // Configure `expect_workers`. self.expect_workers = self.get_value(vec![ // Use --expect-workers if configured. @@ -1920,9 +1946,9 @@ impl GooseConfiguration { filter: defaults.manager_bind_port.is_none() || !self.manager, message: "manager_bind_port", }, - // Otherwise default to DEFAULT_PORT if on Manager. + // Otherwise default to DEFAULT_GAGGLE_PORT if on Manager. GooseValue { - value: Some(DEFAULT_PORT.to_string().parse().unwrap()), + value: Some(DEFAULT_GAGGLE_PORT.to_string().parse().unwrap()), filter: !self.manager, message: "manager_bind_port", }, @@ -1946,7 +1972,7 @@ impl GooseConfiguration { }, // Otherwise default to 127.0.0.1 if on Worker. GooseValue { - value: Some("127.0.0.1".to_string()), + value: Some(DEFAULT_GAGGLE_HOST.to_string()), filter: !self.worker, message: "manager_host", }, @@ -1968,9 +1994,9 @@ impl GooseConfiguration { filter: defaults.manager_port.is_none() || !self.worker, message: "manager_port", }, - // Otherwise default to DEFAULT_PORT if on Worker. + // Otherwise default to DEFAULT_GAGGLE_PORT if on Worker. GooseValue { - value: Some(DEFAULT_PORT.to_string().parse().unwrap()), + value: Some(DEFAULT_GAGGLE_PORT.to_string().parse().unwrap()), filter: !self.worker, message: "manager_port", }, @@ -2022,13 +2048,6 @@ impl GooseConfiguration { detail: "`configuration.scenario_log` can not be set on the Manager." .to_string(), }); - } else if self.no_autostart { - return Err(GooseError::InvalidOption { - option: "`configuration.no_autostart`".to_string(), - value: true.to_string(), - detail: "`configuration.no_autostart` can not be set on the Manager." - .to_string(), - }); } else if !self.report_file.is_empty() { return Err(GooseError::InvalidOption { option: "`configuration.report_file`".to_string(), @@ -2206,14 +2225,6 @@ impl GooseConfiguration { detail: "`configuration.no_status_codes` can not be set in Worker mode." .to_string(), }); - // Can't set `no_autostart` on Worker. - } else if self.no_autostart { - return Err(GooseError::InvalidOption { - option: "`configuration.no_autostart`".to_string(), - value: true.to_string(), - detail: "`configuration.no_autostart` can not be set in Worker mode." - .to_string(), - }); // Can't set `no_gzip` on Worker. } else if self.no_gzip { return Err(GooseError::InvalidOption { diff --git a/src/controller.rs b/src/controller.rs index 1325de9c..88042867 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -7,7 +7,7 @@ use crate::config::GooseConfiguration; use crate::metrics::GooseMetrics; use crate::test_plan::{TestPlan, TestPlanHistory, TestPlanStepAction}; use crate::util; -use crate::{AttackPhase, GooseAttack, GooseAttackRunState, GooseError}; +use crate::{AttackMode, AttackPhase, GooseAttack, GooseAttackRunState, GooseError}; use async_trait::async_trait; use futures::{SinkExt, StreamExt}; @@ -154,6 +154,38 @@ pub enum ControllerCommand { /// /// Can be configured on an idle or running load test. TestPlan, + /// Taggles Gaggle Manager-mode. + /// + /// # Example + /// Enables Gaggle Manager-mode. + /// ```notest + /// manager + /// ``` + Manager, + /// Configures the number of Gaggle Workers to expect before starting the load test. + /// + /// # Example + /// Tells Gaggle Manager to wait for 4 Workers to connect. + /// ```notest + /// expect-workers 4 + /// ``` + ExpectWorkers, + /// Configures the number of Gaggle Workers to expect before starting the load test. + /// + /// # Example + /// Tells Gaggle Manager to wait for 4 Workers to connect. + /// ```notest + /// expect-workers 4 + /// ``` + NoHashCheck, + /// Taggles no-hash-check when in Manager-mode. + /// + /// # Example + /// Enables no-hash-check. + /// ```notest + /// no-hash-check + /// ``` + Worker, /// Display the current [`GooseConfiguration`](../struct.GooseConfiguration.html)s. /// /// # Example @@ -286,6 +318,68 @@ impl ControllerCommand { } }), }, + ControllerCommand::Manager => ControllerCommandDetails { + help: ControllerHelp { + name: "manager", + description: "toggle Manager mode\n", + }, + regex: r"(?i)^manager$", + process_response: Box::new(|response| { + if let ControllerResponseMessage::Bool(true) = response { + Ok("manager mode toggled".to_string()) + } else { + Err("failed to toggle manager mode, be sure load test is idle".to_string()) + } + }), + }, + ControllerCommand::ExpectWorkers => ControllerCommandDetails { + help: ControllerHelp { + name: "expect-workers INT", + description: "set number of Workers to expect\n", + }, + regex: r"(?i)^(expect|expectworkers|expect_workers|expect-workers) ([0-9]+)$", + process_response: Box::new(|response| { + if let ControllerResponseMessage::Bool(true) = response { + Ok("expect-workers configured".to_string()) + } else { + Err( + "failed to configure expect-workers, be sure load test is idle and in Manager mode" + .to_string(), + ) + } + }), + }, + ControllerCommand::NoHashCheck => ControllerCommandDetails { + help: ControllerHelp { + name: "no-hash-check", + description: "toggle no-hash-check\n", + }, + regex: r"(?i)^(no-hash-check|no_hash_check|nohashcheck)$", + process_response: Box::new(|response| { + if let ControllerResponseMessage::Bool(true) = response { + Ok("no-hash-check toggled".to_string()) + } else { + Err( + "failed to toggle no-hash-check, be sure load test is idle and in Manager mode" + .to_string(), + ) + } + }), + }, + ControllerCommand::Worker => ControllerCommandDetails { + help: ControllerHelp { + name: "worker", + description: "toggle Worker mode\n\n", + }, + regex: r"(?i)^worker$", + process_response: Box::new(|response| { + if let ControllerResponseMessage::Bool(true) = response { + Ok("worker mode toggled".to_string()) + } else { + Err("failed to toggle worker mode, be sure load test is idle".to_string()) + } + }), + }, ControllerCommand::Metrics => ControllerCommandDetails { help: ControllerHelp { name: "metrics", @@ -469,7 +563,7 @@ impl ControllerCommand { for command in ControllerCommand::iter() { write!( &mut help_text, - "{:<18} {}", + "{:<19} {}", command.details().help.name, command.details().help.description ) @@ -517,29 +611,53 @@ impl GooseAttack { ControllerCommand::Start => { // We can only start an idle load test. if self.attack_phase == AttackPhase::Idle { - self.test_plan = TestPlan::build(&self.configuration); - if self.prepare_load_test().is_ok() { - // Rebuild test plan in case any parameters have been changed. - self.set_attack_phase( - goose_attack_run_state, - AttackPhase::Increase, - ); - self.reply_to_controller( - message, - ControllerResponseMessage::Bool(true), - ); - // Reset the run state when starting a new load test. - self.reset_run_state(goose_attack_run_state).await?; - self.metrics.history.push(TestPlanHistory::step( - TestPlanStepAction::Increasing, - 0, - )); - } else { - // Do not move to Starting phase if unable to prepare load test. - self.reply_to_controller( - message, - ControllerResponseMessage::Bool(false), - ); + // Validate GooseConfiguration. + match self.configuration.validate() { + Err(e) => { + println!("Failed to start: {:?}", e); + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(false), + ); + } + Ok(_) => { + // Update test plan in case configuration changed. + self.test_plan = TestPlan::build(&self.configuration); + + // Update attack_mode in case configuration changed. + self.attack_mode = if self.configuration.manager { + AttackMode::Manager + } else if self.configuration.worker { + AttackMode::Worker + } else { + AttackMode::StandAlone + }; + + // @TODO: Enforce waiting for Workers to connect. + if self.prepare_load_test().is_ok() { + // Rebuild test plan in case any parameters have been changed. + self.set_attack_phase( + goose_attack_run_state, + AttackPhase::Increase, + ); + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(true), + ); + // Reset the run state when starting a new load test. + self.reset_run_state(goose_attack_run_state).await?; + self.metrics.history.push(TestPlanHistory::step( + TestPlanStepAction::Increasing, + 0, + )); + } else { + // Do not move to Starting phase if unable to prepare load test. + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(false), + ); + } + } } } else { self.reply_to_controller( @@ -896,6 +1014,140 @@ impl GooseAttack { ); } } + ControllerCommand::Manager => { + if self.attack_phase == AttackPhase::Idle { + // Toggle Manager mode. + self.configuration.manager = if self.configuration.manager { + info!("manager mode disabled"); + false + } else { + // Alset disable Worker mode if enabled. + if self.configuration.worker { + info!("worker mode disabled"); + self.configuration.worker = false; + } + info!("manager mode enabled"); + true + }; + // Update Gaggle configuration options. + self.configuration.configure_gaggle(&self.defaults); + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(true), + ); + } else { + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(false), + ); + } + } + ControllerCommand::ExpectWorkers => { + // The controller uses a regular expression to validate that + // this is a valid run time, so simply use it with further + // validation. + if let Some(expect_workers) = &message.request.value { + // Can only change expect_workers when the load test is idle and in Manager mode. + if self.attack_phase == AttackPhase::Idle + && self.configuration.manager + { + // Use expect() as Controller uses regex to validate this is an integer. + let expect_workers = usize::from_str(expect_workers) + .expect("failed to convert string to usize"); + if expect_workers == 0 { + info!( + "changing expect_workers from {:?} to None", + self.configuration.expect_workers + ); + self.configuration.expect_workers = None; + } else { + info!( + "changing expect_workers from {:?} to {}", + self.configuration.expect_workers, expect_workers + ); + self.configuration.expect_workers = Some(expect_workers); + } + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(true), + ); + } else { + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(false), + ); + if self.configuration.manager { + info!("unable to configure expect_workers when load test is not idle"); + } else { + info!("unable to configure expect_workers when not in manager mode"); + } + } + } else { + warn!( + "Controller didn't provide expect_workers: {:#?}", + &message.request + ); + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(false), + ); + } + } + ControllerCommand::NoHashCheck => { + if self.attack_phase == AttackPhase::Idle && self.configuration.manager + { + self.configuration.no_hash_check = + if self.configuration.no_hash_check { + info!("disabled no_hash_check"); + false + } else { + info!("enabled no_hash_check"); + true + }; + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(true), + ); + } else { + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(false), + ); + if self.configuration.manager { + info!("unable to configure expect_workers when load test is not idle"); + } else { + info!("unable to configure expect_workers when not in manager mode"); + } + } + } + ControllerCommand::Worker => { + if self.attack_phase == AttackPhase::Idle { + // Toggle Worker mode. + self.configuration.worker = if self.configuration.worker { + info!("worker mode disabled"); + false + } else { + // Disable Manager mode if enabled. + if self.configuration.manager { + info!("manager mode disabled"); + self.configuration.manager = false; + } + info!("worker mode enabled"); + true + }; + // Update Gaggle configuration options. + self.configuration.configure_gaggle(&self.defaults); + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(true), + ); + } else { + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(false), + ); + } + } // These messages shouldn't be received here. ControllerCommand::Help | ControllerCommand::Exit => { warn!("Unexpected command: {:?}", &message.request); diff --git a/src/gaggle/common.rs b/src/gaggle/common.rs new file mode 100644 index 00000000..b707b061 --- /dev/null +++ b/src/gaggle/common.rs @@ -0,0 +1,23 @@ +pub struct GaggleEcho { + _sequence: u32, + _acknowledge: Option, +} + +/// Commands sent to/from Works and Managers to control a Gaggle. +pub enum GaggleCommand { + ManagerShuttingDown, + Shutdown, + WorkerShuttingDown, + /// Notification that a Worker is standing by and ready to start the load test. + WorkerIsReady, +} + +pub enum GaggleCommands { + Control(GaggleCommand), + Echo(GaggleEcho), + // Not Gaggle-specific + //Error(GooseErrorMetrics), + //Request(GooseRequestMetrics), + //Scenario(ScenarioMetrics), + //Transaction(TransactionMetrics), +} diff --git a/src/gaggle/manager.rs b/src/gaggle/manager.rs new file mode 100644 index 00000000..40563d59 --- /dev/null +++ b/src/gaggle/manager.rs @@ -0,0 +1,71 @@ +/// Manager-specific code. +use std::time::Duration; +use tokio::time::sleep; + +use crate::{GooseConfiguration, GooseDefaults, GooseError}; + +/// Optional join handle for manager thread, if enabled. +pub(crate) type ManagerJoinHandle = + Option>>; +/// Optional unbounded sender to manager thread, if enabled. +pub(crate) type ManagerTx = Option>>; + +#[derive(Debug)] +pub(crate) enum ManagerCommand {} + +/// This structure is used to control the Manager process. +#[derive(Debug)] +pub(crate) struct ManagerMessage { + /// The command that is being sent to the Manager. + pub command: ManagerCommand, + /// An optional value that is being sent to the Manager. + pub value: Option, +} + +impl GooseConfiguration { + // @TODO: move Manager configuration here. + pub(crate) fn configure_manager(&mut self, defaults: &GooseDefaults) { + // + } + + // @TODO: it should be possible for the controller to reconfigure / make changes before load test starts. + // @TODO: This needs to be its own thread, allowing the controller to end it. + pub(crate) async fn setup_manager( + &mut self, + defaults: &GooseDefaults, + ) -> Result<(ManagerJoinHandle, ManagerTx), GooseError> { + //) -> Result<(GooseLoggerJoinHandle, GooseLoggerTx), GooseError> { + // There's no setup necessary if Manager mode is not enabled. + if !self.manager { + return Ok((None, None)); + } + + // Update the manager configuration, loading defaults if necessasry. + self.configure_manager(defaults); + + // Create an unbounded channel to allow the controller to manage the manager thread. + let (manager_tx, manager_rx): ( + flume::Sender>, + flume::Receiver>, + ) = flume::unbounded(); + + let configuration = self.clone(); + let manager_handle = + tokio::spawn(async move { configuration.manager_main(manager_rx).await }); + // @TODO: return manager_tx thread to the controller (if there is a controller) + Ok((Some(manager_handle), Some(manager_tx))) + } + + /// Manager thread, coordinates Worker threads. + pub(crate) async fn manager_main( + self: GooseConfiguration, + receiver: flume::Receiver>, + ) -> Result<(), GooseError> { + loop { + debug!("top of manager loop..."); + sleep(Duration::from_millis(250)).await; + } + + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 4346380e..14551bba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,6 +57,11 @@ pub mod util; #[cfg(feature = "gaggle")] mod worker; +pub mod gaggle { + pub mod common; + pub mod manager; +} + use gumdrop::Options; use lazy_static::lazy_static; #[cfg(feature = "gaggle")] @@ -77,6 +82,7 @@ use tokio::fs::File; use crate::config::{GooseConfiguration, GooseDefaults}; use crate::controller::{ControllerProtocol, ControllerRequest}; +use crate::gaggle::manager::ManagerTx; use crate::goose::{GaggleUser, GooseUser, GooseUserCommand, Scenario, Transaction}; use crate::graph::GraphData; use crate::logger::{GooseLoggerJoinHandle, GooseLoggerTx}; @@ -119,11 +125,6 @@ pub fn get_worker_id() -> usize { WORKER_ID.load(Ordering::Relaxed) } -#[cfg(not(feature = "gaggle"))] -#[derive(Debug, Clone)] -/// Socket used for coordinating a Gaggle distributed load test. -pub(crate) struct Socket {} - /// An enumeration of all errors a [`GooseAttack`](./struct.GooseAttack.html) can return. #[derive(Debug)] pub enum GooseError { @@ -335,6 +336,8 @@ struct GooseAttackRunState { /// Unbounded receiver used by [`GooseUser`](./goose.GooseUser.html) threads to notify /// the parent if they shut themselves down (for example if `--iterations` is reached). shutdown_rx: flume::Receiver, + /// Optional unbounded receiver for manager thread, if enabled. + manager_tx: ManagerTx, /// Optional unbounded receiver for logger thread, if enabled. logger_handle: GooseLoggerJoinHandle, /// Optional unbounded sender from all [`GooseUser`](./goose/struct.GooseUser.html)s @@ -371,8 +374,6 @@ struct GooseAttackRunState { shutdown_after_stop: bool, /// Whether or not the load test is currently canceling. canceling: bool, - /// Optional socket used to coordinate a distributed Gaggle. - socket: Option, } /// Global internal state for the load test. @@ -914,6 +915,7 @@ impl GooseAttack { // Configure GooseConfiguration. self.configuration.configure(&self.defaults); + self.configuration.configure_gaggle(&self.defaults); // Validate GooseConfiguration. self.configuration.validate()?; @@ -979,7 +981,7 @@ impl GooseAttack { } // Start goose in single-process mode. else { - self = self.start_attack(None).await?; + self = self.start_attack().await?; } if self.metrics.display_metrics { @@ -1276,10 +1278,7 @@ impl GooseAttack { // Create a GooseAttackRunState object and do all initialization required // to start a [`GooseAttack`](./struct.GooseAttack.html). - async fn initialize_attack( - &mut self, - socket: Option, - ) -> Result { + async fn initialize_attack(&mut self) -> Result { trace!("initialize_attack"); // Create a single channel used to send metrics from GooseUser threads @@ -1311,6 +1310,7 @@ impl GooseAttack { all_threads_shutdown_tx, metrics_rx, shutdown_rx, + manager_tx: None, logger_handle: None, all_threads_logger_tx: None, throttle_threads_tx: None, @@ -1326,12 +1326,8 @@ impl GooseAttack { all_users_spawned: false, shutdown_after_stop: !self.configuration.no_autostart, canceling: false, - socket, }; - // Access socket to avoid errors. - trace!("socket: {:?}", &goose_attack_run_state.socket); - // Catch ctrl-c to allow clean shutdown to display metrics. util::setup_ctrlc_handler(); @@ -1847,6 +1843,9 @@ impl GooseAttack { goose_attack_run_state.logger_handle = logger_handle; goose_attack_run_state.all_threads_logger_tx = all_threads_logger_tx; + // If enabled, spawn a manager thread. + let (manager_handle, manager_tx) = self.configuration.setup_manager(&self.defaults).await?; + // If enabled, spawn a throttle thread. let (throttle_threads_tx, parent_to_throttle_tx) = self.setup_throttle().await; goose_attack_run_state.throttle_threads_tx = throttle_threads_tx; @@ -1871,13 +1870,11 @@ impl GooseAttack { } // Called internally in local-mode and gaggle-mode. - async fn start_attack(mut self, socket: Option) -> Result { - trace!("start_attack: socket({:?})", socket); - + async fn start_attack(mut self) -> Result { // The GooseAttackRunState is used while spawning and running the // GooseUser threads that generate the load test. let mut goose_attack_run_state = self - .initialize_attack(socket) + .initialize_attack() .await .expect("failed to initialize GooseAttackRunState");