Skip to content

Commit

Permalink
restore previous telnet gaggle work
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyandrews committed May 15, 2023
1 parent ddd45cc commit 9fc78de
Show file tree
Hide file tree
Showing 9 changed files with 1,571 additions and 525 deletions.
384 changes: 8 additions & 376 deletions src/config.rs

Large diffs are not rendered by default.

552 changes: 478 additions & 74 deletions src/controller.rs

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions src/gaggle/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
pub struct GaggleEcho {
_sequence: u32,
_acknowledge: Option<u32>,
}

/// Commands sent to/from Works and Managers to control a Gaggle.
pub enum GaggleCommand {
ManagerShuttingDown,
Shutdown,
WorkerShuttingDown,
/// Notification that a Worker is standing by and ready to start the load test.
WorkerIsReady,
}

pub enum GagglePhase {
WaitingForWorkers,
}

pub enum GaggleCommands {
Control(GaggleCommand),
Echo(GaggleEcho),
// Not Gaggle-specific
//Error(GooseErrorMetrics),
//Request(GooseRequestMetrics),
//Scenario(ScenarioMetrics),
//Transaction(TransactionMetrics),
}
373 changes: 355 additions & 18 deletions src/gaggle/manager.rs

Large diffs are not rendered by default.

515 changes: 515 additions & 0 deletions src/gaggle/worker.rs

Large diffs are not rendered by default.

223 changes: 168 additions & 55 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ mod throttle;
mod user;
pub mod util;
pub mod gaggle {
pub mod common;
pub mod manager;
pub mod worker;
}

use gumdrop::Options;
Expand All @@ -71,6 +73,9 @@ use tokio::fs::File;

use crate::config::{GooseConfiguration, GooseDefaults};
use crate::controller::{ControllerProtocol, ControllerRequest};
use crate::gaggle::common::GagglePhase;
use crate::gaggle::manager::{ManagerCommand, ManagerConnection, ManagerMessage};
use crate::gaggle::worker::{WorkerCommand, WorkerConnection, WorkerMessage};
use crate::goose::{GooseUser, GooseUserCommand, Scenario, Transaction};
use crate::graph::GraphData;
use crate::logger::{GooseLoggerJoinHandle, GooseLoggerTx};
Expand Down Expand Up @@ -319,8 +324,12 @@ struct GooseAttackRunState {
/// Unbounded receiver used by [`GooseUser`](./goose.GooseUser.html) threads to notify
/// the parent if they shut themselves down (for example if `--iterations` is reached).
shutdown_rx: flume::Receiver<usize>,
/// Optional unbounded sender for Manager thread, if enabled.
manager_tx: Option<ManagerConnection>,
/// Optional unbounded sender for Worker thread, if enabled.
worker_tx: Option<WorkerConnection>,
/// Optional unbounded receiver for logger thread, if enabled.
logger_handle: GooseLoggerJoinHandle,
logger_rx: GooseLoggerJoinHandle,
/// Optional unbounded sender from all [`GooseUser`](./goose/struct.GooseUser.html)s
/// to logger thread, if enabled.
all_threads_logger_tx: GooseLoggerTx,
Expand Down Expand Up @@ -355,6 +364,8 @@ struct GooseAttackRunState {
shutdown_after_stop: bool,
/// Whether or not the load test is currently canceling.
canceling: bool,
/// How many Workers are connected, only non-zero in Manager mode.
gaggle_workers: usize,
}

/// Global internal state for the load test.
Expand All @@ -377,6 +388,8 @@ pub struct GooseAttack {
attack_mode: AttackMode,
/// Which phase the load test is currently operating in.
attack_phase: AttackPhase,
// If running in a Gaggle, which phase the load test is currently operating in.
gaggle_phase: Option<GagglePhase>,
/// Defines the order [`Scenario`](./goose/struct.Scenario.html)s and
/// [`Transaction`](./goose/struct.Transaction.html)s are allocated.
scheduler: GooseScheduler,
Expand Down Expand Up @@ -414,6 +427,7 @@ impl GooseAttack {
configuration,
attack_mode: AttackMode::Undefined,
attack_phase: AttackPhase::Idle,
gaggle_phase: None,
scheduler: GooseScheduler::RoundRobin,
started: None,
test_plan: TestPlan::new(),
Expand Down Expand Up @@ -450,6 +464,7 @@ impl GooseAttack {
configuration,
attack_mode: AttackMode::Undefined,
attack_phase: AttackPhase::Idle,
gaggle_phase: None,
scheduler: GooseScheduler::RoundRobin,
started: None,
test_plan: TestPlan::new(),
Expand Down Expand Up @@ -964,22 +979,60 @@ impl GooseAttack {
self.metrics.hash = s.finish();
debug!("hash: {}", self.metrics.hash);

self = match self.attack_mode {
AttackMode::Manager => {
self.manager_main().await?;
panic!("attempted to start in AttackMode::Manager");
},
AttackMode::Worker => {
panic!("attempted to start in AttackMode::Worker");
},
AttackMode::StandAlone => {
self.start_attack().await?
},
AttackMode::Undefined => {
panic!("attempted to start in AttackMode::Undefined");
},
// Launch Manager thread if enabled.
let manager = match self.configuration.setup_manager().await {
Some((h, t)) => {
self.gaggle_phase = Some(GagglePhase::WaitingForWorkers);
Some(ManagerConnection {
_join_handle: h,
tx: t,
})
}
None => None,
};

// Launch Worker thread if enabled.
let worker = match self.configuration.setup_worker(self.metrics.hash).await {
Some((h, t)) => {
self.gaggle_phase = Some(GagglePhase::WaitingForWorkers);
Some(WorkerConnection {
_join_handle: h,
tx: t,
})
}
None => None,
};

// When --no-autostart not enabled, automatically start ...
if !self.configuration.no_autostart {
// Autostart Manager.
if self.configuration.manager {
if let Some(manager) = manager.as_ref() {
let _ = manager.tx.send(ManagerMessage {
command: ManagerCommand::WaitForWorkers,
value: None,
});
} else {
// @TODO: Review how this is possible, provide better error handling.
panic!("Failed to start in Manager mode.")
}
}
// Autostart Worker.
if self.configuration.worker {
if let Some(connection) = worker.as_ref() {
let _ = connection.tx.send(WorkerMessage {
command: WorkerCommand::ConnectToManager,
_value: None,
});
} else {
// @TODO: Review how this is possible, provide better error handling.
panic!("Failed to start in Worker mode.")
}
}
}

self = self.start_attack(manager, worker).await?;

if self.metrics.display_metrics {
info!(
"printing final metrics after {} seconds...",
Expand Down Expand Up @@ -1257,7 +1310,11 @@ impl GooseAttack {

// Create a GooseAttackRunState object and do all initialization required
// to start a [`GooseAttack`](./struct.GooseAttack.html).
async fn initialize_attack(&mut self) -> Result<GooseAttackRunState, GooseError> {
async fn initialize_attack(
&mut self,
manager_tx: Option<ManagerConnection>,
worker_tx: Option<WorkerConnection>,
) -> Result<GooseAttackRunState, GooseError> {
trace!("initialize_attack");

// Create a single channel used to send metrics from GooseUser threads
Expand Down Expand Up @@ -1289,7 +1346,9 @@ impl GooseAttack {
all_threads_shutdown_tx,
metrics_rx,
shutdown_rx,
logger_handle: None,
manager_tx,
worker_tx,
logger_rx: None,
all_threads_logger_tx: None,
throttle_threads_tx: None,
parent_to_throttle_tx: None,
Expand All @@ -1304,6 +1363,7 @@ impl GooseAttack {
all_users_spawned: false,
shutdown_after_stop: !self.configuration.no_autostart,
canceling: false,
gaggle_workers: 0,
};

// Catch ctrl-c to allow clean shutdown to display metrics.
Expand Down Expand Up @@ -1537,7 +1597,7 @@ impl GooseAttack {
debug!("all users exited");

// If the logger thread is enabled, tell it to flush and exit.
if goose_attack_run_state.logger_handle.is_some() {
if goose_attack_run_state.logger_rx.is_some() {
if let Err(e) = goose_attack_run_state
.all_threads_logger_tx
.clone()
Expand All @@ -1548,7 +1608,7 @@ impl GooseAttack {
};
// Take logger out of the GooseAttackRunState object so it can be
// consumed by tokio::join!().
let logger = std::mem::take(&mut goose_attack_run_state.logger_handle);
let logger = std::mem::take(&mut goose_attack_run_state.logger_rx);
let _ = tokio::join!(logger.unwrap());
}

Expand Down Expand Up @@ -1679,12 +1739,18 @@ impl GooseAttack {
let elapsed = self.step_elapsed() as usize;

// Reset the test_plan to stop all users quickly.
self.test_plan.steps = vec![
// Record how many active users there are currently.
(goose_attack_run_state.active_users, elapsed),
// Record how long the attack ran in this step.
(0, 0),
];
self.test_plan.steps = if goose_attack_run_state.active_users > 0 {
// there is an active load test running.
vec![
// Record how many active users there are currently.
(goose_attack_run_state.active_users, elapsed),
// Record how long the attack ran in this step.
(0, 0),
]
} else {
// There is no active load test running.
vec![(0, 0)]
};
// Reset the current step to what was happening when canceled.
self.test_plan.current = 0;

Expand All @@ -1694,6 +1760,9 @@ impl GooseAttack {
// Advance to the final decrease phase.
self.advance_test_plan(goose_attack_run_state);

// @TODO: Special handling for a running Gaggle?
self.gaggle_phase = None;

// Load test isn't just decreasing, it's canceling.
self.metrics
.history
Expand Down Expand Up @@ -1759,7 +1828,7 @@ impl GooseAttack {
// If enabled, spawn a logger thread.
let (logger_handle, all_threads_logger_tx) =
self.configuration.setup_loggers(&self.defaults).await?;
goose_attack_run_state.logger_handle = logger_handle;
goose_attack_run_state.logger_rx = logger_handle;
goose_attack_run_state.all_threads_logger_tx = all_threads_logger_tx;

// If enabled, spawn a throttle thread.
Expand All @@ -1786,17 +1855,47 @@ impl GooseAttack {
}

// Called internally in local-mode and gaggle-mode.
async fn start_attack(mut self) -> Result<GooseAttack, GooseError> {
async fn start_attack(
mut self,
manager_tx: Option<ManagerConnection>,
worker_tx: Option<WorkerConnection>,
) -> Result<GooseAttack, GooseError> {
// The GooseAttackRunState is used while spawning and running the
// GooseUser threads that generate the load test.
let mut goose_attack_run_state = self
.initialize_attack()
.initialize_attack(manager_tx, worker_tx)
.await
.expect("failed to initialize GooseAttackRunState");

// The Goose parent process GooseAttack loop runs until Goose shuts down. Goose enters
// the loop in AttackPhase::Idle, and exits in AttackPhase::Shutdown.
loop {
// Check if running in Gaggle mode.
if let Some(gaggle_phase) = self.gaggle_phase.as_ref() {
match gaggle_phase {
GagglePhase::WaitingForWorkers => {
debug!("Gaggle mode, waiting for workers...");

// Gracefully exit loop if ctrl-c is caught.
self.exit_gracefully(&mut goose_attack_run_state).await?;

// Check if a Controller has made a request.
self.handle_controller_requests(&mut goose_attack_run_state)
.await?;

// @TODO: determine if enough Workers have connected.

// Wake up twice a second to check for events, and otherwise keep
// waiting for workers.
goose_attack_run_state.drift_timer = util::sleep_minus_drift(
time::Duration::from_millis(500),
goose_attack_run_state.drift_timer,
)
.await;
continue;
}
}
}
match self.attack_phase {
// In the Idle phase the Goose configuration can be changed by a Controller,
// and otherwise nothing happens but sleeping an checking for messages.
Expand Down Expand Up @@ -1864,42 +1963,56 @@ impl GooseAttack {
self.handle_controller_requests(&mut goose_attack_run_state)
.await?;

let mut message = goose_attack_run_state.shutdown_rx.try_recv();
while message.is_ok() {
goose_attack_run_state
.users_shutdown
.insert(message.expect("failed to wrap OK message"));
// Gracefully exit loop if ctrl-c is caught.
self.exit_gracefully(&mut goose_attack_run_state).await?;
}

// In Stand-alone mode, all users are started.
if goose_attack_run_state.users_shutdown.len() == self.test_plan.total_users() {
self.cancel_attack(&mut goose_attack_run_state).await?;
}
Ok(self)
}

message = goose_attack_run_state.shutdown_rx.try_recv();
// Check if ctrl-c was caught or shutdown message was received, and if so exit gracefully.
async fn exit_gracefully(
&mut self,
goose_attack_run_state: &mut GooseAttackRunState,
) -> Result<(), GooseError> {
let mut message = goose_attack_run_state.shutdown_rx.try_recv();
while message.is_ok() {
goose_attack_run_state
.users_shutdown
.insert(message.expect("failed to wrap OK message"));

// In Gaggle mode, the Worker starts a fraction of the users.
// @TODO

// In Stand-alone mode, all users are started.
if goose_attack_run_state.users_shutdown.len() == self.test_plan.total_users() {
self.cancel_attack(goose_attack_run_state).await?;
}

// Gracefully exit loop if ctrl-c is caught.
if self.attack_phase != AttackPhase::Shutdown
&& !goose_attack_run_state.canceling
&& *CANCELED.read().unwrap()
{
// Shutdown after stopping as the load test was canceled.
goose_attack_run_state.shutdown_after_stop = true;

// No metrics to display when sitting idle, so disable.
if self.attack_phase == AttackPhase::Idle {
self.metrics.display_metrics = false;
}
message = goose_attack_run_state.shutdown_rx.try_recv();
}

// Cleanly stop the load test.
self.cancel_attack(&mut goose_attack_run_state).await?;
// Gracefully exit loop if ctrl-c is caught.
if self.attack_phase != AttackPhase::Shutdown
&& !goose_attack_run_state.canceling
&& *CANCELED.read().unwrap()
{
// Shutdown after stopping as the load test was canceled.
goose_attack_run_state.shutdown_after_stop = true;

// Load test is actively canceling.
goose_attack_run_state.canceling = true;
// No metrics to display when sitting idle, so disable.
if self.attack_phase == AttackPhase::Idle {
self.metrics.display_metrics = false;
}

// Cleanly stop the load test.
self.cancel_attack(goose_attack_run_state).await?;

// Load test is actively canceling.
goose_attack_run_state.canceling = true;
}

Ok(self)
Ok(())
}
}

Expand Down
Loading

0 comments on commit 9fc78de

Please sign in to comment.