diff --git a/benchmarking/tpch/ray_job_runner.py b/benchmarking/tpch/ray_job_runner.py index dc6141ad18..89301cf647 100644 --- a/benchmarking/tpch/ray_job_runner.py +++ b/benchmarking/tpch/ray_job_runner.py @@ -1,11 +1,3 @@ -# /// script -# requires-python = ">=3.12" -# dependencies = [ -# "ray[default]", -# "getdaft", -# ] -# /// - from __future__ import annotations import argparse diff --git a/tools/tpcds.py b/tools/tpcds.py deleted file mode 100644 index 8db38a7798..0000000000 --- a/tools/tpcds.py +++ /dev/null @@ -1,70 +0,0 @@ -# /// script -# requires-python = ">=3.12" -# dependencies = [ -# "PyGithub", -# "boto3", -# ] -# /// - -import argparse -import json - -import github -import utils - - -def run( - branch_name: str, - questions: str, - scale_factor: int, - cluster_profile: str, - env_vars: str, -): - branch_name, _ = utils.get_name_and_commit_hash(branch_name) - - expanded_questions = utils.parse_questions(questions, 99) - print(f"Running scale-factor of {scale_factor}GB on questions: {', '.join(map(str, expanded_questions))}") - args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions] - entrypoint_args = json.dumps(args_as_list) - - workflow = utils.repo.get_workflow("run-cluster.yaml") - utils.dispatch( - workflow=workflow, - branch_name=branch_name, - inputs={ - "cluster_profile": cluster_profile, - "working_dir": "benchmarking/tpcds", - "entrypoint_script": "ray_entrypoint.py", - "entrypoint_args": entrypoint_args, - "env_vars": env_vars, - }, - ) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") - parser.add_argument( - "--questions", type=str, required=False, default="*", help="A comma separated list of questions to run" - ) - parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on") - parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on") - parser.add_argument( - "--env-vars", - type=str, - required=False, - help="A comma separated list of environment variables to pass to ray job", - ) - parser.add_argument("--verbose", action="store_true", help="Verbose debugging") - args = parser.parse_args() - - if args.verbose: - github.enable_console_debug_logging() - - run( - branch_name=args.ref, - questions=args.questions, - scale_factor=args.scale_factor, - cluster_profile=args.cluster_profile, - env_vars=args.env_vars, - ) diff --git a/tools/tpch.py b/tools/tpch.py deleted file mode 100644 index 097f9e1f51..0000000000 --- a/tools/tpch.py +++ /dev/null @@ -1,80 +0,0 @@ -# /// script -# requires-python = ">=3.12" -# dependencies = [ -# "PyGithub", -# "boto3", -# ] -# /// - -import argparse -import json - -import github -import utils - -TOTAL_NUMBER_OF_QUESTIONS = 22 - - -def run( - branch_name: str, - questions: str, - scale_factor: int, - num_partitions: int, - cluster_profile: str, - env_vars: str, -): - branch_name, _ = utils.get_name_and_commit_hash(branch_name) - - expanded_questions = utils.parse_questions(questions, TOTAL_NUMBER_OF_QUESTIONS) - print( - f"Running scale-factor of {scale_factor}GB with {num_partitions} partitions on questions: {', '.join(map(str, expanded_questions))}" - ) - args_as_list = [ - f"--question-number={q} --parquet-folder=s3://eventual-dev-benchmarking-fixtures/uncompressed/tpch-dbgen/{scale_factor}_0/{num_partitions}/parquet/" - for q in expanded_questions - ] - entrypoint_args = json.dumps(args_as_list) - - workflow = utils.repo.get_workflow("run-cluster.yaml") - utils.dispatch( - workflow=workflow, - branch_name=branch_name, - inputs={ - "cluster_profile": cluster_profile, - "working_dir": "benchmarking/tpch", - "entrypoint_script": "ray_job_runner.py", - "entrypoint_args": entrypoint_args, - "env_vars": env_vars, - }, - ) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") - parser.add_argument( - "--questions", type=str, required=False, default="*", help="A comma separated list of questions to run" - ) - parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on") - parser.add_argument("--num-partitions", type=int, required=False, default=2, help="The number of partitions") - parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on") - parser.add_argument( - "--env-vars", - type=str, - required=False, - help="A comma separated list of environment variables to pass to ray job", - ) - parser.add_argument("--verbose", action="store_true", help="Verbose debugging") - args = parser.parse_args() - - if args.verbose: - github.enable_console_debug_logging() - - run( - branch_name=args.ref, - questions=args.questions, - scale_factor=args.scale_factor, - num_partitions=args.num_partitions, - cluster_profile=args.cluster_profile, - env_vars=args.env_vars, - ) diff --git a/tools/utils.py b/tools/utils.py deleted file mode 100644 index 97953ebd47..0000000000 --- a/tools/utils.py +++ /dev/null @@ -1,116 +0,0 @@ -import subprocess -import time -import typing -from typing import Optional - -import gha_run_cluster_job -from github import Auth, Github -from github.Workflow import Workflow -from github.WorkflowRun import WorkflowRun - -RETRY_ATTEMPTS = 5 - -auth = Auth.Token(gha_run_cluster_job.get_oauth_token()) -g = Github(auth=auth) -repo = g.get_repo("Eventual-Inc/Daft") - - -def dispatch(workflow: Workflow, branch_name: str, inputs: dict) -> WorkflowRun: - pre_creation_latest_run = get_latest_run(workflow) - - print(f"Launching workflow '{workflow.name}' on the branch '{branch_name}'") - created = workflow.create_dispatch( - ref=branch_name, - inputs=inputs, - ) - if not created: - raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") - - post_creation_latest_run = None - for _ in range(RETRY_ATTEMPTS): - post_creation_latest_run = get_latest_run(workflow) - if pre_creation_latest_run.run_number == post_creation_latest_run.run_number: - sleep_and_then_retry() - elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number: - break - else: - typing.assert_never( - "Run numbers are always returned in sorted order and are always monotonically increasing" - ) - if not post_creation_latest_run: - raise RuntimeError(f"Unable to locate the new run request for the '{workflow.name}' workflow") - - print(f"Launched new '{workflow.name}' workflow with id: {post_creation_latest_run.id}") - print(f"View the workflow run at: {post_creation_latest_run.html_url}") - - return post_creation_latest_run - - -def sleep_and_then_retry(sleep_amount_sec: int = 3): - time.sleep(sleep_amount_sec) - - -def get_latest_run(workflow: Workflow) -> WorkflowRun: - for _ in range(RETRY_ATTEMPTS): - runs = workflow.get_runs() - - if runs.totalCount > 0: - return runs[0] - - sleep_and_then_retry() - - raise RuntimeError("Unable to list all workflow invocations") - - -def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: - branch_name = branch_name or "HEAD" - name = ( - subprocess.check_output(["git", "rev-parse", "--abbrev-ref", branch_name], stderr=subprocess.STDOUT) - .strip() - .decode("utf-8") - ) - commit_hash = ( - subprocess.check_output(["git", "rev-parse", branch_name], stderr=subprocess.STDOUT).strip().decode("utf-8") - ) - return name, commit_hash - - -def parse_questions(questions: str, total_number_of_questions: int) -> list[int]: - if not questions: - return [] - - if questions == "*": - return list(range(1, total_number_of_questions + 1)) - - items = questions.split(",") - nums = [] - for item in items: - try: - num = int(item) - if num > total_number_of_questions: - raise RuntimeError( - f"Requested question number ({num}) is greater than the total number of questions available ({total_number_of_questions})" - ) - nums.append(str(num)) - continue - except ValueError: - ... - - if "-" not in item: - raise ValueError("...") - - try: - lower, upper = item.split("-") - lower_int = int(lower) - upper_int = int(upper) - if lower_int > upper_int: - raise ValueError - if upper_int > total_number_of_questions: - raise RuntimeError( - f"Requested question number ({upper_int}) is greater than the total number of questions available ({total_number_of_questions})" - ) - nums.extend(range(lower_int, upper_int + 1)) - except ValueError: - raise ValueError(f"Invalid question item; expected a number or a range, instead got {item}") - - return nums