From 539c491226ce189809687af8ee508a7998b7857c Mon Sep 17 00:00:00 2001 From: Jeremy Andrews Date: Wed, 17 Aug 2022 13:09:03 +0200 Subject: [PATCH] validate Worker hash --- src/controller.rs | 86 ++++++++++++++++++++++++++++++++----------- src/gaggle/manager.rs | 22 +++++++++-- src/gaggle/worker.rs | 83 +++++++++++++++++++++++++++++++++++------ src/goose.rs | 2 +- src/lib.rs | 8 ++-- 5 files changed, 158 insertions(+), 43 deletions(-) diff --git a/src/controller.rs b/src/controller.rs index 16cde981..10f82e7c 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -43,7 +43,7 @@ use tokio_tungstenite::tungstenite::Message; /// 3. Add any necessary parent process logic for the command to /// `GooseAttack::handle_controller_requests` (also in this file). /// 4. Add a test for the new command in tests/controller.rs. -#[derive(Clone, Debug, EnumIter, PartialEq)] +#[derive(Clone, Debug, EnumIter, PartialEq, Eq)] pub enum ControllerCommand { /// Displays a list of all commands supported by the Controller. /// @@ -520,7 +520,7 @@ impl ControllerCommand { }, ControllerCommand::WorkerConnect => ControllerCommandDetails { help: None, - regex: r"^WORKER-CONNECT$", + regex: r"^(WORKER-CONNECT) (\d+)$", process_response: Box::new(|response| { if let ControllerResponseMessage::Bool(true) = response { Ok("worker connected".to_string()) @@ -1193,32 +1193,58 @@ impl GooseAttack { // Verify running in Manager mode. if self.configuration.manager { // Verify expecting more Workers to connect. - // @TODO: Validate connection before sending to the Manager. if goose_attack_run_state.gaggle_workers < self.configuration.expect_workers.unwrap_or(0) { if let Some(manager) = goose_attack_run_state.manager.as_ref() { - goose_attack_run_state.gaggle_workers += 1; - info!( - "Worker {} of {} connected.", - goose_attack_run_state.gaggle_workers, - self.configuration.expect_workers.unwrap_or(0) - ); - if let Some(ControllerValue::Socket(socket)) = + if let Some(ControllerValue::Socket(worker_connection)) = message.request.value { - // Pass the Telnet socket to the Manager thread. - let _ = manager.tx.send(ManagerMessage { - command: ManagerCommand::WorkerJoinRequest, - value: Some(socket), - }); + // Use expect() as Controller uses regex to validate this is an integer. + let worker_hash = + u64::from_str(&worker_connection.hash) + .expect("failed to convert string to usize"); + if worker_hash != self.metrics.hash + && !self.configuration.no_hash_check + { + /* + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(false), + ); + */ + warn!("WorkerConnect request ignored, Worker hash {} does not match Manager hash {}, enable --no-hash-check to ignore.", worker_hash, self.metrics.hash) + } else { + if worker_hash != self.metrics.hash { + warn!("Ignoring that Worker hash {} does not match Manager hash {} because --no-hash-check is enabled.", worker_hash, self.metrics.hash) + } else { + warn!("Valid hash: {}", worker_hash); + } + goose_attack_run_state.gaggle_workers += 1; + info!( + "Worker {} of {} connected.", + goose_attack_run_state.gaggle_workers, + self.configuration.expect_workers.unwrap_or(0) + ); + // Pass the Telnet socket to the Manager thread. + let _ = manager.tx.send(ManagerMessage { + command: ManagerCommand::WorkerJoinRequest, + value: Some(worker_connection.socket), + }); + } } else { - panic!("WorkerConnect falure, failed to move telnet socket."); + warn!("Whoops !?"); + //panic!("Whoops!?"); } } else { panic!("WorkerConnect failure, failed to reference manager_tx.") } } else { + // @TODO: Can we return a helpful error? + self.reply_to_controller( + message, + ControllerResponseMessage::Bool(false), + ); warn!("WorkerConnect request ignored, all expected Workers already connected.") } } else { @@ -1341,21 +1367,21 @@ impl FromStr for ControllerCommand { let matches: Vec<_> = commands.matches(s).into_iter().collect(); // This happens any time the controller receives an invalid command. if matches.is_empty() { - return Err(GooseError::InvalidControllerCommand { + Err(GooseError::InvalidControllerCommand { detail: format!("unrecognized controller command: '{}'.", s), - }); + }) // This shouldn't ever happen, but if it does report all available information. } else if matches.len() > 1 { let mut matched_commands = Vec::new(); for index in matches { matched_commands.push(keys[index].clone()) } - return Err(GooseError::InvalidControllerCommand { + Err(GooseError::InvalidControllerCommand { detail: format!( "matched multiple controller commands: '{}' ({:?}).", s, matched_commands ), - }); + }) // Only one command matched. } else { Ok(keys[*matches.first().unwrap()].clone()) @@ -1416,11 +1442,17 @@ pub(crate) struct ControllerRequestMessage { pub value: Option, } +#[derive(Debug)] +pub(crate) struct WorkerConnection { + hash: String, + socket: tokio::net::TcpStream, +} + /// Allows multiple types to be sent to the parent process. #[derive(Debug)] pub(crate) enum ControllerValue { Text(String), - Socket(tokio::net::TcpStream), + Socket(WorkerConnection), } /// An enumeration of all messages the parent can reply back to the controller thread. @@ -1585,6 +1617,14 @@ impl ControllerState { if let Ok(command_string) = self.get_command_string(buf).await { // Extract the command and value in a generic way. if let Ok(request_message) = self.get_match(command_string.trim()).await { + let hash = if let Some(ControllerValue::Text(hash)) = + request_message.value.as_ref() + { + // Clone the value. + hash.to_string() + } else { + unreachable!("Hash must exist, enforced by regex"); + }; // Workers using Telnet socket to connect to the Manager. if request_message.command == ControllerCommand::WorkerConnect { info!("Worker instance connecting ..."); @@ -1597,7 +1637,9 @@ impl ControllerState { client_id: self.thread_id, request: ControllerRequestMessage { command: ControllerCommand::WorkerConnect, - value: Some(ControllerValue::Socket(socket)), + value: Some(ControllerValue::Socket( + WorkerConnection { hash, socket }, + )), }, }) .is_err() diff --git a/src/gaggle/manager.rs b/src/gaggle/manager.rs index 4cd42394..9c6603d2 100644 --- a/src/gaggle/manager.rs +++ b/src/gaggle/manager.rs @@ -69,7 +69,7 @@ enum ManagerPhase { /// Workers are connecting to the Manager, Gaggle can not be reconfigured. WaitForWorkers, /// All Workers are connected and the load test is ready. - _Active, + Active, } impl GooseConfiguration { @@ -347,8 +347,12 @@ impl GooseConfiguration { manager_run_state.idle_status_displayed = true; } } - ManagerPhase::WaitForWorkers => {} - ManagerPhase::_Active => {} + ManagerPhase::WaitForWorkers => { + // @TODO: Keepalive? Timeout? + } + ManagerPhase::Active => { + // @TODO: Actually start the load test. + } } // Process messages received from parent or Controller thread. @@ -358,7 +362,7 @@ impl GooseConfiguration { ManagerCommand::WaitForWorkers => { let expect_workers = self.expect_workers.unwrap_or(0); if expect_workers == 1 { - info!("Manager is waiting for {} Worker.", expect_workers); + info!("Manager is waiting for 1 Worker."); } else { info!("Manager is waiting for {} Workers.", expect_workers); } @@ -371,6 +375,16 @@ impl GooseConfiguration { } // Store Worker socket for ongoing communications. manager_run_state.workers.push(socket); + + if let Some(expect_workers) = self.expect_workers { + if manager_run_state.workers.len() == self.expect_workers.unwrap() { + info!( + "All {} Workers have connected, starting the load test.", + expect_workers + ); + manager_run_state.phase = ManagerPhase::Active; + } + } } ManagerCommand::_Exit => { info!("Manager is exiting."); diff --git a/src/gaggle/worker.rs b/src/gaggle/worker.rs index 6b70202d..7ba4a811 100644 --- a/src/gaggle/worker.rs +++ b/src/gaggle/worker.rs @@ -1,7 +1,7 @@ +use std::io; +use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; use tokio::time::{sleep, Duration}; -//use tokio::io::AsyncWriteExt; -use std::io; use crate::config::{GooseConfigure, GooseValue}; use crate::metrics::GooseCoordinatedOmissionMitigation; @@ -40,15 +40,25 @@ pub(crate) struct WorkerConnection { pub(crate) tx: WorkerTx, } +enum ConnectionState { + WaitForPrompt, + WaitForOk, + Connected, +} + struct WorkerRunState { /// Whether or not a message has been displayed indicating the Worker is currently idle. idle_status_displayed: bool, /// Whether or Worker has successfully connected to Manager instance. connected_to_manager: bool, - /// @TODO How many times + /// @TODO: Connection status + connection_state: Option, + /// A counter tracking how many times the Worker has attempted to connect to the Manager. connection_attempts: u8, /// Which phase the Worker is currently operating in. phase: WorkerPhase, + /// Whether or not a message has been displayed indicating the Worker is ready and waiting. + waiting_status_displayed: bool, /// This variable accounts for time spent doing things which is then subtracted from /// the time sleeping to avoid an unintentional drift in events that are supposed to /// happen regularly. @@ -63,8 +73,10 @@ impl WorkerRunState { WorkerRunState { idle_status_displayed: false, connected_to_manager: false, + connection_state: None, connection_attempts: 0, phase: WorkerPhase::Idle, + waiting_status_displayed: false, drift_timer: tokio::time::Instant::now(), controller_rx, stream: None, @@ -78,7 +90,7 @@ enum WorkerPhase { /// Trying to connect to the Manager instance. ConnectingToManager, /// Connected to Manager instance, waiting for the go-ahead to start load test. - _WaitingForManager, + WaitingForManager, /// Active load test. _Active, Exit, @@ -317,7 +329,7 @@ impl GooseConfiguration { } // Spawn a Worker thread, provide a channel so it can be controlled by parent and/or Control;er thread. - pub(crate) async fn setup_worker(&mut self) -> Option<(WorkerJoinHandle, WorkerTx)> { + pub(crate) async fn setup_worker(&mut self, hash: u64) -> Option<(WorkerJoinHandle, WorkerTx)> { // There's no setup necessary if Worker mode is not enabled. if !self.worker { return None; @@ -328,7 +340,8 @@ impl GooseConfiguration { flume::unbounded(); let configuration = self.clone(); - let worker_handle = tokio::spawn(async move { configuration.worker_main(worker_rx).await }); + let worker_handle = + tokio::spawn(async move { configuration.worker_main(worker_rx, hash).await }); // Return worker_tx thread for the (optional) controller thread. Some((worker_handle, worker_tx)) @@ -338,6 +351,7 @@ impl GooseConfiguration { pub(crate) async fn worker_main( self: GooseConfiguration, receiver: flume::Receiver, + hash: u64, ) -> Result<(), GooseError> { // Initialze the Worker run state, used for the lifetime of this Worker instance. let mut worker_run_state = WorkerRunState::new(receiver); @@ -348,6 +362,9 @@ impl GooseConfiguration { loop { debug!("top of worker loop..."); + // @TODO: How to detect that the socket is dropped? + // @TODO: Add a timeout. + match worker_run_state.phase { // Display message when entering WorkerPhase::Idle, otherwise sleep waiting for a // message from Parent or Controller thread. @@ -362,7 +379,10 @@ impl GooseConfiguration { if worker_run_state.connection_attempts == 0 || worker_run_state.connection_attempts % 5 == 0 { - info!("Worker connecting to {{}}."); + info!( + "Worker connecting to {}:{}.", + self.manager_host, self.manager_port + ); } if worker_run_state.connection_attempts >= MAX_CONNECTION_ATTEMPTS { @@ -383,6 +403,8 @@ impl GooseConfiguration { { Ok(s) => { worker_run_state.connected_to_manager = true; + worker_run_state.connection_state = + Some(ConnectionState::WaitForPrompt); Some(s) } Err(e) => { @@ -398,16 +420,53 @@ impl GooseConfiguration { } }; } - if let Some(stream) = worker_run_state.stream.as_ref() { + if let Some(stream) = worker_run_state.stream.as_mut() { if let Ok(Some(message)) = read_buffer(stream) { - if message.starts_with("goose>") { - info!("Got `goose>` prompt."); + if let Some(connection_state) = + worker_run_state.connection_state.as_ref() + { + match connection_state { + ConnectionState::WaitForPrompt => { + if message.starts_with("goose>") { + info!("Got `goose>` prompt."); + worker_run_state.connection_state = + Some(ConnectionState::WaitForOk); + stream + .write_all( + format!("WORKER-CONNECT {}\n", hash).as_bytes(), + ) + .await?; + } else { + panic!("Failed to get `goose>` prompt: @TODO: handle this more gracefully."); + } + } + ConnectionState::WaitForOk => { + if message.starts_with("OK") { + info!("Got OK."); + worker_run_state.connection_state = + Some(ConnectionState::Connected); + worker_run_state.phase = WorkerPhase::WaitingForManager; + } else { + panic!("Failed to get OK: @TODO: handle this more gracefully."); + } + } + _ => { + unreachable!("We should not be here."); + } + } } } }; } - WorkerPhase::_WaitingForManager => {} - WorkerPhase::_Active => {} + WorkerPhase::WaitingForManager => { + if !worker_run_state.waiting_status_displayed { + info!("Standing by, waiting for Manager to start the load test..."); + worker_run_state.waiting_status_displayed = true; + } + } + WorkerPhase::_Active => { + info!("Let's get this party started!"); + } WorkerPhase::Exit => { info!("Worker is exiting."); break; diff --git a/src/goose.rs b/src/goose.rs index 3eef2177..eddc63d8 100644 --- a/src/goose.rs +++ b/src/goose.rs @@ -652,7 +652,7 @@ impl Scenario { /// Commands sent from the parent thread to the user threads, and from the manager to the /// worker processes. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum GooseUserCommand { /// Tell worker process to pause load test. Wait, diff --git a/src/lib.rs b/src/lib.rs index 29d38755..a284498e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -265,7 +265,7 @@ impl From for GooseError { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] /// A [`GooseAttack`](./struct.GooseAttack.html) load test operates in one (and only one) /// of the following modes. pub enum AttackMode { @@ -279,7 +279,7 @@ pub enum AttackMode { Worker, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] /// A [`GooseAttack`](./struct.GooseAttack.html) load test moves through each of the following /// phases during a complete load test. pub enum AttackPhase { @@ -295,7 +295,7 @@ pub enum AttackPhase { Shutdown, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] /// Used to define the order [`Scenario`](./goose/struct.Scenario.html)s and /// [`Transaction`](./goose/struct.Transaction.html)s are allocated. /// @@ -1052,7 +1052,7 @@ impl GooseAttack { }; // Launch worker thread if enabled. - let worker = match self.configuration.setup_worker().await { + let worker = match self.configuration.setup_worker(self.metrics.hash).await { Some((h, t)) => { self.gaggle_phase = Some(GagglePhase::WaitingForWorkers); Some(WorkerConnection {