Skip to content

Commit

Permalink
Merge branch 'tpcds-wrapper' into csv-outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
raunakab committed Dec 19, 2024
2 parents e3a4729 + 2378e68 commit b78c3be
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 0 deletions.
8 changes: 8 additions & 0 deletions benchmarking/tpch/ray_job_runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "ray[default]",
# "getdaft",
# ]
# ///

from __future__ import annotations

import argparse
Expand Down
70 changes: 70 additions & 0 deletions tools/tpcds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "PyGithub",
# "boto3",
# ]
# ///

import argparse
import json

import github
import utils


def run(
branch_name: str,
questions: str,
scale_factor: int,
cluster_profile: str,
env_vars: str,
):
branch_name, _ = utils.get_name_and_commit_hash(branch_name)

expanded_questions = utils.parse_questions(questions, 99)
print(f"Running scale-factor of {scale_factor}GB on questions: {', '.join(map(str, expanded_questions))}")
args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions]
entrypoint_args = json.dumps(args_as_list)

workflow = utils.repo.get_workflow("run-cluster.yaml")
utils.dispatch(
workflow=workflow,
branch_name=branch_name,
inputs={
"cluster_profile": cluster_profile,
"working_dir": "benchmarking/tpcds",
"entrypoint_script": "ray_entrypoint.py",
"entrypoint_args": entrypoint_args,
"env_vars": env_vars,
},
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ref", type=str, required=False, help="The branch name to run on")
parser.add_argument(
"--questions", type=str, required=False, default="*", help="A comma separated list of questions to run"
)
parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on")
parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on")
parser.add_argument(
"--env-vars",
type=str,
required=False,
help="A comma separated list of environment variables to pass to ray job",
)
parser.add_argument("--verbose", action="store_true", help="Verbose debugging")
args = parser.parse_args()

if args.verbose:
github.enable_console_debug_logging()

run(
branch_name=args.ref,
questions=args.questions,
scale_factor=args.scale_factor,
cluster_profile=args.cluster_profile,
env_vars=args.env_vars,
)
80 changes: 80 additions & 0 deletions tools/tpch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "PyGithub",
# "boto3",
# ]
# ///

import argparse
import json

import github
import utils

TOTAL_NUMBER_OF_QUESTIONS = 22


def run(
branch_name: str,
questions: str,
scale_factor: int,
num_partitions: int,
cluster_profile: str,
env_vars: str,
):
branch_name, _ = utils.get_name_and_commit_hash(branch_name)

expanded_questions = utils.parse_questions(questions, TOTAL_NUMBER_OF_QUESTIONS)
print(
f"Running scale-factor of {scale_factor}GB with {num_partitions} partitions on questions: {', '.join(map(str, expanded_questions))}"
)
args_as_list = [
f"--question-number={q} --parquet-folder=s3://eventual-dev-benchmarking-fixtures/uncompressed/tpch-dbgen/{scale_factor}_0/{num_partitions}/parquet/"
for q in expanded_questions
]
entrypoint_args = json.dumps(args_as_list)

workflow = utils.repo.get_workflow("run-cluster.yaml")
utils.dispatch(
workflow=workflow,
branch_name=branch_name,
inputs={
"cluster_profile": cluster_profile,
"working_dir": "benchmarking/tpch",
"entrypoint_script": "ray_job_runner.py",
"entrypoint_args": entrypoint_args,
"env_vars": env_vars,
},
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ref", type=str, required=False, help="The branch name to run on")
parser.add_argument(
"--questions", type=str, required=False, default="*", help="A comma separated list of questions to run"
)
parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on")
parser.add_argument("--num-partitions", type=int, required=False, default=2, help="The number of partitions")
parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on")
parser.add_argument(
"--env-vars",
type=str,
required=False,
help="A comma separated list of environment variables to pass to ray job",
)
parser.add_argument("--verbose", action="store_true", help="Verbose debugging")
args = parser.parse_args()

if args.verbose:
github.enable_console_debug_logging()

run(
branch_name=args.ref,
questions=args.questions,
scale_factor=args.scale_factor,
num_partitions=args.num_partitions,
cluster_profile=args.cluster_profile,
env_vars=args.env_vars,
)
116 changes: 116 additions & 0 deletions tools/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import subprocess
import time
import typing
from typing import Optional

import gha_run_cluster_job
from github import Auth, Github
from github.Workflow import Workflow
from github.WorkflowRun import WorkflowRun

RETRY_ATTEMPTS = 5

auth = Auth.Token(gha_run_cluster_job.get_oauth_token())
g = Github(auth=auth)
repo = g.get_repo("Eventual-Inc/Daft")


def dispatch(workflow: Workflow, branch_name: str, inputs: dict) -> WorkflowRun:
pre_creation_latest_run = get_latest_run(workflow)

print(f"Launching workflow '{workflow.name}' on the branch '{branch_name}'")
created = workflow.create_dispatch(
ref=branch_name,
inputs=inputs,
)
if not created:
raise RuntimeError("Could not create workflow, suggestion: run again with --verbose")

post_creation_latest_run = None
for _ in range(RETRY_ATTEMPTS):
post_creation_latest_run = get_latest_run(workflow)
if pre_creation_latest_run.run_number == post_creation_latest_run.run_number:
sleep_and_then_retry()
elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number:
break
else:
typing.assert_never(
"Run numbers are always returned in sorted order and are always monotonically increasing"
)
if not post_creation_latest_run:
raise RuntimeError(f"Unable to locate the new run request for the '{workflow.name}' workflow")

print(f"Launched new '{workflow.name}' workflow with id: {post_creation_latest_run.id}")
print(f"View the workflow run at: {post_creation_latest_run.html_url}")

return post_creation_latest_run


def sleep_and_then_retry(sleep_amount_sec: int = 3):
time.sleep(sleep_amount_sec)


def get_latest_run(workflow: Workflow) -> WorkflowRun:
for _ in range(RETRY_ATTEMPTS):
runs = workflow.get_runs()

if runs.totalCount > 0:
return runs[0]

sleep_and_then_retry()

raise RuntimeError("Unable to list all workflow invocations")


def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]:
branch_name = branch_name or "HEAD"
name = (
subprocess.check_output(["git", "rev-parse", "--abbrev-ref", branch_name], stderr=subprocess.STDOUT)
.strip()
.decode("utf-8")
)
commit_hash = (
subprocess.check_output(["git", "rev-parse", branch_name], stderr=subprocess.STDOUT).strip().decode("utf-8")
)
return name, commit_hash


def parse_questions(questions: str, total_number_of_questions: int) -> list[int]:
if not questions:
return []

if questions == "*":
return list(range(1, total_number_of_questions + 1))

items = questions.split(",")
nums = []
for item in items:
try:
num = int(item)
if num > total_number_of_questions:
raise RuntimeError(
f"Requested question number ({num}) is greater than the total number of questions available ({total_number_of_questions})"
)
nums.append(str(num))
continue
except ValueError:
...

if "-" not in item:
raise ValueError("...")

try:
lower, upper = item.split("-")
lower_int = int(lower)
upper_int = int(upper)
if lower_int > upper_int:
raise ValueError
if upper_int > total_number_of_questions:
raise RuntimeError(
f"Requested question number ({upper_int}) is greater than the total number of questions available ({total_number_of_questions})"
)
nums.extend(range(lower_int, upper_int + 1))
except ValueError:
raise ValueError(f"Invalid question item; expected a number or a range, instead got {item}")

return nums

0 comments on commit b78c3be

Please sign in to comment.