Skip to content

Commit

Permalink
Add logic to capture output in InstanceRunner (#24)
Browse files Browse the repository at this point in the history
- Enhanced the `InstanceRunner` class to capture and save application output into the resulting DataFrame.
- Implemented logic to load and use lambdas from user configuration for parsing the captured data.
  • Loading branch information
rodrigo-ceccato committed Aug 15, 2024
1 parent c929574 commit e409d9c
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 8 deletions.
77 changes: 70 additions & 7 deletions spinner/runner/instance_runner.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import itertools
import os
import shutil
import signal
import subprocess
import time

import pandas as pd
from jinja2 import Environment, Template, Undefined
from rich import print as rprint

Expand Down Expand Up @@ -88,6 +86,51 @@ def check_output(self, output) -> bool:
rprint("[red] Check output not implemented!")
return True

def filter_output(self, output, error, output_filter):
combined_output = str(output + error)
filtered_output = ""
captured_parameters = {}

for filter in output_filter:
current_capture = ""

if filter["type"] == "all":
current_capture = combined_output

elif filter["type"] == "contains":
rprint(f"-> Filtering lines that contain: {filter['pattern']}")
current_capture += "\n".join(
[
line
for line in combined_output.split("\n")
if filter["pattern"] in line
]
)
current_capture += "\n"

else:
rprint(f"[red] Unsupported filter type: {filter['type']}")

if "to_float" in filter:
rprint(f"-> Converting captured line to float")
param_name = filter["to_float"]["name"]
param_lambda = filter["to_float"]["lambda"]

try:
param_value = eval(param_lambda, {}, {})(current_capture)
except Exception as e:
rprint(f"-> Error evaluating param_lambda: {e}")
param_value = None

captured_parameters[param_name] = param_value

filtered_output += current_capture

rprint(f"-> Filtered output: {filtered_output}")
rprint(f"-> Captured parameters: {captured_parameters}")

return str(filtered_output), captured_parameters

def run(self):
"""Runs the exercise and stores the results in self.results"""
run_cmds = self.get_run_instruction()
Expand Down Expand Up @@ -122,12 +165,29 @@ def run(self):

# Append run to execution_df
curr_df_entry["time"] = elapsed_time
if "output" in self.metadata[self.bench_name]:
rprint(f"-> Capturing output")
filtered_output, captured_parameters = self.filter_output(
output, error, self.metadata[self.bench_name]["output"]
)

curr_df_entry["output"] = filtered_output
for param_name, param_value in captured_parameters.items():
curr_df_entry[param_name] = param_value

if not set(curr_df_entry.keys()).issubset(
set(self.execution_df.columns)
):
raise ValueError(
"Entry contains a column that is mising in the dataframe."
)
self.execution_df.loc[len(self.execution_df)] = curr_df_entry

def run_command(self, cmd, timeout: int, retry: bool, retry_limit: int):
"""Runs a command and returns the output, time and return code"""
rprint(f"-> Running command: {cmd['cmd']}")
rprint(f"-> Path: {os.getcwd()}")
rprint(f"-> Timeout: {timeout}")

# Retry logic to run until success
remaining_tries = 1
Expand All @@ -153,6 +213,7 @@ def run_command(self, cmd, timeout: int, retry: bool, retry_limit: int):
stderr=subprocess.PIPE,
# cwd=cwd,
env=self.runner_env,
start_new_session=True,
)
stdout, stderr = process.communicate(timeout=timeout)
returncode = process.returncode
Expand All @@ -163,19 +224,21 @@ def run_command(self, cmd, timeout: int, retry: bool, retry_limit: int):
except subprocess.TimeoutExpired as e:
rprint(f"-> Command timeout after {timeout} seconds")
finished = False
process.kill()
process.communicate()
returncode = -1 # Typically, -1 indicates a timeout error
returncode = -1 # Timeout error code
stderr = f"Timeout error (limit: {timeout}s)"
stdout = ""

rprint(f"-> Killing children group {process.pid}")
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
except ProcessLookupError:
rprint(
f"-> Failed to kill children group {process.pid}, probably already dead"
f"-> Failed to kill children group {process.pid}, possibly already dead"
)
rprint(f"-> Done killing children group {process.pid}")
finally:
process.communicate() # Clean up if necessary, though process should be dead.
rprint(f"-> Done killing children group {process.pid}")

rprint(f"-> Remaining tries: {remaining_tries}")

end_time = time.time()
Expand Down
14 changes: 13 additions & 1 deletion spinner/runner/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,19 @@ def run_benchmarks(config, hosts):
itertools.chain(*[list(bench_config[bench].keys()) for bench in bench_names])
)

columns = list(itertools.chain(["name"], parameters, ["time"]))
# List parameters generated by output capture, if any
capture_parameters = []
for bench_name in bench_names:
if "output" in bench_config["metadata"][bench_name]:
capture_parameters.append("output")
rprint(f"-> Capturing output from execution")

for filter in bench_config["metadata"][bench_name]["output"]:
if "to_float" in filter:
rprint(f"-> Capturing parameter {filter['to_float']['name']}")
capture_parameters.append(filter["to_float"]["name"])

columns = list(itertools.chain(["name"], parameters, capture_parameters, ["time"]))

execution_df = pd.DataFrame(columns=columns)

Expand Down

0 comments on commit e409d9c

Please sign in to comment.