Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev kmr #469

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions endpoints/remotehost/remotehost
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ function launch_osruntimes() {
}

function move_remotehost_logs() {
local engine_label remote_cs_log_file container_name delete_remote_dir log_file
local engine_label remote_cs_log_file container_name delete_remote_dir log_file cmd_rc cmd_retries cmd_attempt
local total_rc=0
local delete_remote_dir=0

Expand All @@ -574,11 +574,28 @@ function move_remotehost_logs() {
delete_remote_dir=1
remote_cs_log_file="$remote_logs_dir/$log_file"
do_scp "$user@$host" "$remote_cs_log_file" "" $engine_logs_dir
(( total_rc += $? ))
cmd_rc=$?
if [ $cmd_rc != 0 ]; then
echo "Capturing chroot log for ${engine_label} failed with RC=${cmd_rc}"
fi
(( total_rc += $cmd_rc ))
elif [ "$os_runtime" == "podman" ]; then
container_name="${run_id}_${engine_label}"
do_ssh $user@$host podman logs $container_name > "$engine_logs_dir/$log_file"
(( total_rc += $? ))
# implementing retry logic here because it is possible for
# 'podman logs' to fail due to lock contention when podman
# is under heavy load
cmd_retries=5
cmd_attempt=1
cmd_rc=1
while [ $cmd_rc != 0 -a $cmd_attempt -le $cmd_retries ]; do
do_ssh $user@$host podman logs --timestamps $container_name > "$engine_logs_dir/$log_file" 2>&1
cmd_rc=$?
if [ $cmd_rc != 0 ]; then
echo "Capturing podman log for ${engine_label} failed with RC=${cmd_rc} on attempt ${cmd_attempt} of ${cmd_retries}"
(( cmd_attempt += 1 ))
fi
done
(( total_rc += $cmd_rc ))
else
echo "WARNING: os_runtime $os_runtime not supported"
fi
Expand All @@ -587,7 +604,11 @@ function move_remotehost_logs() {
if [ "$delete_remote_dir" == "1" -a $total_rc == 0 ]; then
echo "Deleting remote directory:"
do_ssh $user@$host /bin/rm -rv $remote_dir
(( total_rc += $? ))
cmd_rc=$?
if [ $cmd_rc != 0 ]; then
echo "Deleting remote directory failed with RC=${cmd_rc}"
fi
(( total_rc += $cmd_rc ))
fi

if [ $total_rc -gt 0 ]; then
Expand Down
153 changes: 136 additions & 17 deletions rickshaw-run
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use File::Path qw(make_path);
use JSON::XS;
use JSON::Validator;
use Data::Dumper;
use threads;
use threads::shared;
use Thread::Queue;
use Thread::Semaphore;

$Data::Dumper::Sortkeys = 1;
$Data::Dumper::Pair = ' : ';
Expand Down Expand Up @@ -128,6 +132,20 @@ my @active_followers;
(my $detect_arch_cmd, my $arch, my $detect_arch_cmd_rc) = run_cmd('uname -m');
chomp($arch);

my $available_cpus = 0;
open(PROCCPUINFO, "<", "/proc/cpuinfo") || die("[ERROR] Could not open /proc/cpuinfo for reading\n");
while(<PROCCPUINFO>) {
if ($_ =~ /^processor/) {
$available_cpus++;
}
}
close(PROCCPUINFO);
if ($available_cpus == 0) {
die("[ERROR] Did not find any available cpus for job processing!\n");
}
debug_log(sprintf "Found %d available cpus for job processing.\n", $available_cpus);


$SIG{'INT'} = sub {
print "Caught a CTRL-C/SIGINT, aborting via roadblock!\n";
$abort_via_roadblock = 1;
Expand Down Expand Up @@ -1597,6 +1615,47 @@ sub save_config_info() {
printf "Finished saving json settings to %s\n", $config_dir . "/rickshaw-run.json";
}

sub endpoint_validation_worker_thread() {
my ($thread_idx,
$job_queue,
$job_errors,
$endpoint_outputs,
$thread_start_lock,
$finished_threads,
$finished_threads_lock,
$job_errors_lock) = @_;

# sync with other threads
$thread_start_lock->down(1);
$thread_start_lock->up(1);

while ($job_queue->pending()) {
my $job = $job_queue->dequeue_nb;

if (defined $job) {
debug_log(sprintf "Endpoint Validation Thread-%d got a job for %s, cmd is '%s'\n", $thread_idx, $job->{'endpoint'}, $job->{'command'});
(my $cmd, my $output, my $cmd_rc) = run_cmd($job->{'command'});
if ($cmd_rc > 0) {
printf "[ERROR] Endpoint " . $job->{'endpoint'} .
" validation returned non-zero exit code " . $cmd_rc . "\n" .
$output . "\n";
$job_errors_lock->down();
$$job_errors += 1;
$job_errors_lock->up();
} else {
debug_log(sprintf "Endpoint %s validated\n", $job->{'endpoint'});
$endpoint_outputs->{$job->{'endpoint'}} = $output;
}
}
}

$finished_threads_lock->down();
$$finished_threads += 1;
$finished_threads_lock->up();

return 0;
}

sub validate_endpoints() {
# Call each endpoint script with "--validate" as the first option, and each endpoint script should
# return a list of clients and servers which are used from this endpoint. Collect this output
Expand All @@ -1610,25 +1669,83 @@ sub validate_endpoints() {
# and not rickshaw.
my $min_id;
my $max_id;
my $job_queue = new Thread::Queue;
my %endpoint_outputs : shared;
my $collectors_present = 0;
printf "Confirming the endpoints will satisfy the benchmark requirements:\n";

# Process all endpoint --validate outputs to get ID range for client and server
# enqueue one validation job per endpoint
foreach my $endpoint (@endpoints) {
my $pushd_dir = pushd($rickshaw_project_dir . "/endpoints/" . $$endpoint{'type'});
my $cmd = "./" . $$endpoint{'type'} .
" --endpoint-label=" . $$endpoint{'label'} .
" --base-run-dir=" . $run{'base-run-dir'} .
" --endpoint-opts=" . $$endpoint{'opts'} .
" --validate";
debug_log(sprintf "endopoint validation command: [%s]\n", $cmd);
($cmd, my $output, my $cmd_rc) = run_cmd($cmd);
debug_log(sprintf "\n[$output]\n");
my @output = grep(!/^#/, split(/\n/, $output));
if ($cmd_rc > 0) {
die "[ERROR]Endpoint " . $$endpoint{'type'} . " with options " . $$endpoint{'opts'} .
" validation returned non-zero exit code:\n" . join("\n", @output) . "\n";
}
my %job = ( 'endpoint' => $$endpoint{'label'},
'command' => $rickshaw_project_dir . "/endpoints/" . $$endpoint{'type'} .
"/" . $$endpoint{'type'} .
" --endpoint-label=" . $$endpoint{'label'} .
" --base-run-dir=" . $run{'base-run-dir'} .
" --endpoint-opts=" . $$endpoint{'opts'} .
" --validate" );
$job_queue->enqueue(\%job);
}

my $num_threads = $available_cpus;
my @threads;
if ($num_threads > $job_queue->pending) {
debug_log(sprintf "Reducing endpoint validation thread count from %d to %d\n", $num_threads, $job_queue->pending());
$num_threads = $job_queue->pending();
} else {
debug_log(sprintf "There will be %d endpoint validation threads\n", $num_threads);
}

my $job_errors : shared = 0;
my $finished_threads : shared = 0;
my $thread_start_lock = new Thread::Semaphore($num_threads);
my $finished_threads_lock = new Thread::Semaphore();
my $job_errors_lock = new Thread::Semaphore();;

# acquire the lock to block thread pool start
$thread_start_lock->down($num_threads);

# create the thread pool
for (my $thread_idx = 0; $thread_idx < $num_threads; $thread_idx++) {
debug_log(sprintf "Creating endpoint validation Thread-%d\n", $thread_idx);
push @threads, threads->create('endpoint_validation_worker_thread',
$thread_idx,
$job_queue,
\$job_errors,
\%endpoint_outputs,
$thread_start_lock,
\$finished_threads,
$finished_threads_lock,
$job_errors_lock);
}

# release the lock to allow the thread pool to run
$thread_start_lock->up($num_threads);

while ($job_queue->pending() || ($finished_threads != $num_threads)) {
debug_log(sprintf "Pending endpoint validation jobs: %d\n", $job_queue->pending());
debug_log(sprintf "Endpoint Validation Threads Finished: %d/%d\n", $finished_threads, $num_threads);
sleep 1;
}
debug_log("Endpoint Validation Job processing complete\n");

# wait and die here on validation error(s) so that we generate all
# the errors rather than just a subset
if ($job_errors > 0) {
die(sprintf "[ERROR] %d endpoint validation command(s) failed!\n", $job_errors);
}

debug_log("endpoint_outputs:\n" . Dumper \%endpoint_outputs);

# we should be doing this block, but the perl interpreter is dying
# with an error when we do:
#for (my $thread_idx = 0; $thread_idx < scalar @threads; $thread_idx++) {
# debug_log(sprintf "Joining Thread-%d\n", $thread_idx);
# my $thread_ret = $threads[$thread_idx]->join();
# debug_log(sprintf "Thread-%d returned %d\n", $thread_idx, $thread_ret);
#}

foreach my $endpoint (@endpoints) {
my @output = grep(!/^#/, split(/\n/, $endpoint_outputs{$$endpoint{'label'}}));
# Output from endpoint's validation should be 1 or more lines with "client" or "server"
# followed by 1 or more positive integers representing the client/server IDs this
# endpoint handles::
Expand Down Expand Up @@ -2223,7 +2340,7 @@ sub deploy_endpoints() {
# Endpoints should return for each client and server started:
# - the ID of the client/server
# - the roadblock client ID
printf "Going to run endpoint command:\n%s\n\n", $cmd;
printf "Going to run endpoint command for %s:\n%s\n\n", $label, $cmd;
if ($endpoint_roadblock_opt eq "") {
# There is only one client and no synchronization, so we wait for the endpoint to finish
debug_log(sprintf "going to run and wait for: %s\n", $cmd);
Expand Down Expand Up @@ -2370,7 +2487,9 @@ sub organize_run_data() {
my $mv_cmd = "/bin/mv * " . $cs_tool_dest_path;
($mv_cmd, my $mv_output, my $mv_cmd_rc) = run_cmd($mv_cmd);
} else {
printf "WARNING: did not find expected sub-directories in %s\n", $pushd_dir . "/" . "tool-data";
if (! $cs_type eq "profiler") {
printf "WARNING: did not find expected sub-directories in %s\n", $pushd_dir . "/" . "tool-data";
}
}
}
if (scalar dir_entries("sysinfo", qr/\w+/) > 0) {
Expand Down
Loading