Skip to content

Commit

Permalink
validate Worker hash
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyandrews committed Aug 17, 2022
1 parent c7c909d commit 539c491
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 43 deletions.
86 changes: 64 additions & 22 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use tokio_tungstenite::tungstenite::Message;
/// 3. Add any necessary parent process logic for the command to
/// `GooseAttack::handle_controller_requests` (also in this file).
/// 4. Add a test for the new command in tests/controller.rs.
#[derive(Clone, Debug, EnumIter, PartialEq)]
#[derive(Clone, Debug, EnumIter, PartialEq, Eq)]
pub enum ControllerCommand {
/// Displays a list of all commands supported by the Controller.
///
Expand Down Expand Up @@ -520,7 +520,7 @@ impl ControllerCommand {
},
ControllerCommand::WorkerConnect => ControllerCommandDetails {
help: None,
regex: r"^WORKER-CONNECT$",
regex: r"^(WORKER-CONNECT) (\d+)$",
process_response: Box::new(|response| {
if let ControllerResponseMessage::Bool(true) = response {
Ok("worker connected".to_string())
Expand Down Expand Up @@ -1193,32 +1193,58 @@ impl GooseAttack {
// Verify running in Manager mode.
if self.configuration.manager {
// Verify expecting more Workers to connect.
// @TODO: Validate connection before sending to the Manager.
if goose_attack_run_state.gaggle_workers
< self.configuration.expect_workers.unwrap_or(0)
{
if let Some(manager) = goose_attack_run_state.manager.as_ref() {
goose_attack_run_state.gaggle_workers += 1;
info!(
"Worker {} of {} connected.",
goose_attack_run_state.gaggle_workers,
self.configuration.expect_workers.unwrap_or(0)
);
if let Some(ControllerValue::Socket(socket)) =
if let Some(ControllerValue::Socket(worker_connection)) =
message.request.value
{
// Pass the Telnet socket to the Manager thread.
let _ = manager.tx.send(ManagerMessage {
command: ManagerCommand::WorkerJoinRequest,
value: Some(socket),
});
// Use expect() as Controller uses regex to validate this is an integer.
let worker_hash =
u64::from_str(&worker_connection.hash)
.expect("failed to convert string to usize");
if worker_hash != self.metrics.hash
&& !self.configuration.no_hash_check
{
/*
self.reply_to_controller(
message,
ControllerResponseMessage::Bool(false),
);
*/
warn!("WorkerConnect request ignored, Worker hash {} does not match Manager hash {}, enable --no-hash-check to ignore.", worker_hash, self.metrics.hash)
} else {
if worker_hash != self.metrics.hash {
warn!("Ignoring that Worker hash {} does not match Manager hash {} because --no-hash-check is enabled.", worker_hash, self.metrics.hash)
} else {
warn!("Valid hash: {}", worker_hash);
}
goose_attack_run_state.gaggle_workers += 1;
info!(
"Worker {} of {} connected.",
goose_attack_run_state.gaggle_workers,
self.configuration.expect_workers.unwrap_or(0)
);
// Pass the Telnet socket to the Manager thread.
let _ = manager.tx.send(ManagerMessage {
command: ManagerCommand::WorkerJoinRequest,
value: Some(worker_connection.socket),
});
}
} else {
panic!("WorkerConnect falure, failed to move telnet socket.");
warn!("Whoops !?");
//panic!("Whoops!?");
}
} else {
panic!("WorkerConnect failure, failed to reference manager_tx.")
}
} else {
// @TODO: Can we return a helpful error?
self.reply_to_controller(
message,
ControllerResponseMessage::Bool(false),
);
warn!("WorkerConnect request ignored, all expected Workers already connected.")
}
} else {
Expand Down Expand Up @@ -1341,21 +1367,21 @@ impl FromStr for ControllerCommand {
let matches: Vec<_> = commands.matches(s).into_iter().collect();
// This happens any time the controller receives an invalid command.
if matches.is_empty() {
return Err(GooseError::InvalidControllerCommand {
Err(GooseError::InvalidControllerCommand {
detail: format!("unrecognized controller command: '{}'.", s),
});
})
// This shouldn't ever happen, but if it does report all available information.
} else if matches.len() > 1 {
let mut matched_commands = Vec::new();
for index in matches {
matched_commands.push(keys[index].clone())
}
return Err(GooseError::InvalidControllerCommand {
Err(GooseError::InvalidControllerCommand {
detail: format!(
"matched multiple controller commands: '{}' ({:?}).",
s, matched_commands
),
});
})
// Only one command matched.
} else {
Ok(keys[*matches.first().unwrap()].clone())
Expand Down Expand Up @@ -1416,11 +1442,17 @@ pub(crate) struct ControllerRequestMessage {
pub value: Option<ControllerValue>,
}

#[derive(Debug)]
pub(crate) struct WorkerConnection {
hash: String,
socket: tokio::net::TcpStream,
}

/// Allows multiple types to be sent to the parent process.
#[derive(Debug)]
pub(crate) enum ControllerValue {
Text(String),
Socket(tokio::net::TcpStream),
Socket(WorkerConnection),
}

/// An enumeration of all messages the parent can reply back to the controller thread.
Expand Down Expand Up @@ -1585,6 +1617,14 @@ impl ControllerState {
if let Ok(command_string) = self.get_command_string(buf).await {
// Extract the command and value in a generic way.
if let Ok(request_message) = self.get_match(command_string.trim()).await {
let hash = if let Some(ControllerValue::Text(hash)) =
request_message.value.as_ref()
{
// Clone the value.
hash.to_string()
} else {
unreachable!("Hash must exist, enforced by regex");
};
// Workers using Telnet socket to connect to the Manager.
if request_message.command == ControllerCommand::WorkerConnect {
info!("Worker instance connecting ...");
Expand All @@ -1597,7 +1637,9 @@ impl ControllerState {
client_id: self.thread_id,
request: ControllerRequestMessage {
command: ControllerCommand::WorkerConnect,
value: Some(ControllerValue::Socket(socket)),
value: Some(ControllerValue::Socket(
WorkerConnection { hash, socket },
)),
},
})
.is_err()
Expand Down
22 changes: 18 additions & 4 deletions src/gaggle/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ enum ManagerPhase {
/// Workers are connecting to the Manager, Gaggle can not be reconfigured.
WaitForWorkers,
/// All Workers are connected and the load test is ready.
_Active,
Active,
}

impl GooseConfiguration {
Expand Down Expand Up @@ -347,8 +347,12 @@ impl GooseConfiguration {
manager_run_state.idle_status_displayed = true;
}
}
ManagerPhase::WaitForWorkers => {}
ManagerPhase::_Active => {}
ManagerPhase::WaitForWorkers => {
// @TODO: Keepalive? Timeout?
}
ManagerPhase::Active => {
// @TODO: Actually start the load test.
}
}

// Process messages received from parent or Controller thread.
Expand All @@ -358,7 +362,7 @@ impl GooseConfiguration {
ManagerCommand::WaitForWorkers => {
let expect_workers = self.expect_workers.unwrap_or(0);
if expect_workers == 1 {
info!("Manager is waiting for {} Worker.", expect_workers);
info!("Manager is waiting for 1 Worker.");
} else {
info!("Manager is waiting for {} Workers.", expect_workers);
}
Expand All @@ -371,6 +375,16 @@ impl GooseConfiguration {
}
// Store Worker socket for ongoing communications.
manager_run_state.workers.push(socket);

if let Some(expect_workers) = self.expect_workers {
if manager_run_state.workers.len() == self.expect_workers.unwrap() {
info!(
"All {} Workers have connected, starting the load test.",
expect_workers
);
manager_run_state.phase = ManagerPhase::Active;
}
}
}
ManagerCommand::_Exit => {
info!("Manager is exiting.");
Expand Down
Loading

0 comments on commit 539c491

Please sign in to comment.