From b1b01615a8b0bb569bc774e9ce38514880c9bc8a Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 17 Dec 2024 18:09:34 -0800 Subject: [PATCH 1/9] Add tpcds launching script --- tools/tpcds.py | 175 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 tools/tpcds.py diff --git a/tools/tpcds.py b/tools/tpcds.py new file mode 100644 index 0000000000..e6995434e8 --- /dev/null +++ b/tools/tpcds.py @@ -0,0 +1,175 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "PyGithub", +# "boto3", +# "inquirer", +# ] +# /// + +import argparse +import subprocess +import time +import typing +from pathlib import Path +from typing import Optional + +import boto3 +import gha_run_cluster_job +import inquirer +from github import Auth, Github, enable_console_debug_logging +from github.Workflow import Workflow +from github.WorkflowJob import WorkflowJob +from github.WorkflowRun import WorkflowRun + +WHEEL_NAME = "getdaft-0.3.0.dev0-cp38-abi3-manylinux_2_31_x86_64.whl" +RETRY_ATTEMPTS = 5 + + +def sleep_and_then_retry(sleep_amount_sec: int = 3): + time.sleep(sleep_amount_sec) + + +def get_latest_run(workflow: Workflow) -> WorkflowRun: + for _ in range(RETRY_ATTEMPTS): + runs = workflow.get_runs() + + if runs.totalCount > 0: + return runs[0] + + sleep_and_then_retry() + + raise RuntimeError("Unable to list all workflow invocations") + + +def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: + branch_name = branch_name or "HEAD" + name = ( + subprocess.check_output(["git", "rev-parse", "--abbrev-ref", branch_name], stderr=subprocess.STDOUT) + .strip() + .decode("utf-8") + ) + commit_hash = ( + subprocess.check_output(["git", "rev-parse", branch_name], stderr=subprocess.STDOUT).strip().decode("utf-8") + ) + return name, commit_hash + + +auth = Auth.Token(gha_run_cluster_job.get_oauth_token()) +g = Github(auth=auth) + + +def build(branch_name: Optional[str]): + """Runs a build on the given branch. + + If the branch has already been built, it will reuse the already built wheel. + """ + s3 = boto3.client("s3") + + branch_name, commit_hash = get_name_and_commit_hash(branch_name) + response: dict = s3.list_objects_v2(Bucket="github-actions-artifacts-bucket", Prefix=f"builds/{commit_hash}/") + wheel_urls = [] + for wheel in response.get("Contents", []): + wheel: dict + if "Key" in wheel: + wheel_path = Path(wheel["Key"]) + wheel_name = wheel_path.name + print(wheel_name) + if wheel_name == WHEEL_NAME: + wheel_urls.append( + f"https://github-actions-artifacts-bucket.s3.us-west-2.amazonaws.com/builds/{commit_hash}/{wheel_name}" + ) + + length = len(wheel_urls) + assert length <= 1, "There should never be more than 1 object in S3 with the exact same key" + + print(f"Checking if a build exists for the branch '{branch_name}' (commit-hash: {commit_hash})") + + if length == 0: + user_wants_to_build_commit = inquirer.confirm(message="No build found; would you like to build this branch?") + if not user_wants_to_build_commit: + print("Workflow aborted") + exit(1) + + repo = g.get_repo("Eventual-Inc/Daft") + workflow = repo.get_workflow("build-commit.yaml") + + pre_creation_latest_run = get_latest_run(workflow) + + inputs = {"arch": "x86"} + print(f"Launching new 'build-commit' workflow with the following inputs: {inputs}") + created = workflow.create_dispatch( + ref=branch_name, + inputs=inputs, + ) + if not created: + raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") + + post_creation_latest_run = None + for _ in range(RETRY_ATTEMPTS): + post_creation_latest_run = get_latest_run(workflow) + if pre_creation_latest_run.run_number == post_creation_latest_run.run_number: + sleep_and_then_retry() + elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number: + break + else: + typing.assert_never( + "Run numbers are always returned in sorted order and are always monotonically increasing" + ) + + if not post_creation_latest_run: + raise RuntimeError("Unable to locate the new run request for the 'build-commit' workflow") + + print(f"Latest 'build-commit' workflow run found with id: {post_creation_latest_run.id}") + print(f"View the workflow run at: {post_creation_latest_run.url}") + + while True: + jobs = repo.get_workflow_run(post_creation_latest_run.id).jobs() + if not jobs: + raise RuntimeError("The 'build-commit' workflow should have 1 job") + elif len(jobs) > 1: + raise RuntimeError("The 'build-commit' workflow should only have 1 job") + + build_commit_job: WorkflowJob = jobs[0] + if build_commit_job.conclusion: + break + else: + print(f"Job is still running with status: {build_commit_job.status}") + + sleep_and_then_retry(10) + + print(f"Job completed with status {build_commit_job.conclusion}") + + if build_commit_job.conclusion != "success": + raise RuntimeError( + f"The 'build-commit' workflow failed; view the results here: {post_creation_latest_run.url}" + ) + elif length == 1: + print("Build found; re-using build") + + # repo = g.get_repo("Eventual-Inc/Daft") + # workflow = repo.get_workflow("build-commit.yaml") + # + # created = workflow.create_dispatch( + # ref=branch, + # inputs={ + # "arch": "x86", + # }, + # ) + # + # if not created: + # raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") + # + # print("Workflow created, view it at: https://github.com/Eventual-Inc/Daft/actions/workflows/run-cluster.yaml") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") + parser.add_argument("--verbose", action="store_true", help="Verbose debugging") + args = parser.parse_args() + + if args.verbose: + enable_console_debug_logging() + + build(branch_name=args.ref) From ad6372e0c4b16cc93e47da202fb1b82ba2775fc5 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 17 Dec 2024 18:12:53 -0800 Subject: [PATCH 2/9] Add force flag --- tools/tpcds.py | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/tools/tpcds.py b/tools/tpcds.py index e6995434e8..4b9a0ac51d 100644 --- a/tools/tpcds.py +++ b/tools/tpcds.py @@ -59,7 +59,7 @@ def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: g = Github(auth=auth) -def build(branch_name: Optional[str]): +def build(branch_name: Optional[str], force: bool): """Runs a build on the given branch. If the branch has already been built, it will reuse the already built wheel. @@ -85,7 +85,7 @@ def build(branch_name: Optional[str]): print(f"Checking if a build exists for the branch '{branch_name}' (commit-hash: {commit_hash})") - if length == 0: + def run_build(): user_wants_to_build_commit = inquirer.confirm(message="No build found; would you like to build this branch?") if not user_wants_to_build_commit: print("Workflow aborted") @@ -144,32 +144,27 @@ def build(branch_name: Optional[str]): raise RuntimeError( f"The 'build-commit' workflow failed; view the results here: {post_creation_latest_run.url}" ) + + if length == 0: + run_build() elif length == 1: - print("Build found; re-using build") - - # repo = g.get_repo("Eventual-Inc/Daft") - # workflow = repo.get_workflow("build-commit.yaml") - # - # created = workflow.create_dispatch( - # ref=branch, - # inputs={ - # "arch": "x86", - # }, - # ) - # - # if not created: - # raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") - # - # print("Workflow created, view it at: https://github.com/Eventual-Inc/Daft/actions/workflows/run-cluster.yaml") + if force: + run_build() + else: + print("Build found; re-using build") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") + parser.add_argument("--force", action="store_true", help="Force a rebuild") parser.add_argument("--verbose", action="store_true", help="Verbose debugging") args = parser.parse_args() if args.verbose: enable_console_debug_logging() - build(branch_name=args.ref) + build( + branch_name=args.ref, + force=args.force, + ) From 13bebc421f0186f0ef42f15fe3b8b14c29cf6fc6 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 17 Dec 2024 18:34:09 -0800 Subject: [PATCH 3/9] Remove check of builds in GitHub Actions workflow - this should only happen in the uv run scripts --- .github/ci-scripts/get_wheel_name_from_s3.py | 45 -------------------- .github/workflows/build-commit.yaml | 21 --------- 2 files changed, 66 deletions(-) delete mode 100644 .github/ci-scripts/get_wheel_name_from_s3.py diff --git a/.github/ci-scripts/get_wheel_name_from_s3.py b/.github/ci-scripts/get_wheel_name_from_s3.py deleted file mode 100644 index b033fdedf0..0000000000 --- a/.github/ci-scripts/get_wheel_name_from_s3.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Given a commit hash and a "platform substring", prints the wheelname of the wheel (if one exists) to stdout. - -# Example - -```bash -COMMIT_HASH="abcdef0123456789" -PLATFORM_SUBSTRING="x86" -WHEELNAME=$(python get_wheel_name_from_s3.py $COMMIT_HASH $PLATFORM_SUBSTRING) - -echo $WHEELNAME -# Will echo the wheelname if a wheel exists that matches the platform substring. -# Otherwise, will echo nothing. -``` -""" - -import argparse -from pathlib import Path - -import boto3 -import wheellib - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--commit-hash", required=True) - parser.add_argument("--platform-substring", required=True, choices=["x86", "aarch", "arm"]) - args = parser.parse_args() - - commit_hash = args.commit_hash - platform_substring = args.platform_substring - - s3 = boto3.client("s3") - response = s3.list_objects_v2(Bucket="github-actions-artifacts-bucket", Prefix=f"builds/{commit_hash}/") - matches = [] - for content in response.get("Contents", []): - wheelname = Path(content["Key"]).name - platform_tag = wheellib.get_platform_tag(wheelname) - if platform_substring in platform_tag: - matches.append(wheelname) - - if len(matches) > 1: - raise RuntimeError( - f"Multiple wheels found that match the given platform substring: {platform_substring}; expected just 1" - ) - - print(matches[0]) if matches else None diff --git a/.github/workflows/build-commit.yaml b/.github/workflows/build-commit.yaml index a8a77417ef..42b7400ca8 100644 --- a/.github/workflows/build-commit.yaml +++ b/.github/workflows/build-commit.yaml @@ -59,35 +59,14 @@ jobs: uv v source .venv/bin/activate uv pip install boto3 packaging - - name: Check if build already exists in AWS S3 - run: | - source .venv/bin/activate - wheel_name=$(python .github/ci-scripts/get_wheel_name_from_s3.py \ - --commit-hash "${{ github.sha }}" \ - --platform-substring "$platform_substring" \ - ) - if [ "$wheel_name" ]; then - echo "Python wheel for this commit already built and uploaded" - else - echo "No python wheel for this commit found; proceeding with build" - fi - echo "wheel_name=$wheel_name" >> $GITHUB_ENV - name: Build release wheel run: | - if [ "$wheel_name" ]; then - echo "Python wheel for this commit already built and uploaded" - exit 0 - fi export CARGO_TARGET_DIR=~/target source .venv/bin/activate uv pip install pip maturin boto3 maturin build --release - name: Upload wheel to AWS S3 run: | - if [ "$wheel_name" ]; then - echo "Python wheel for this commit already built and uploaded" - exit 0 - fi source .venv/bin/activate wheel_name=$(python .github/ci-scripts/upload_wheel_to_s3.py \ --commit-hash "${{ github.sha }}" \ From f5b233c5c01877ae34d46cef900854c4597aadbf Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 17 Dec 2024 18:35:06 -0800 Subject: [PATCH 4/9] Finish build step --- tools/tpcds.py | 153 +++++++++++++++++++++++++++---------------------- 1 file changed, 86 insertions(+), 67 deletions(-) diff --git a/tools/tpcds.py b/tools/tpcds.py index 4b9a0ac51d..282e2e675b 100644 --- a/tools/tpcds.py +++ b/tools/tpcds.py @@ -59,14 +59,79 @@ def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: g = Github(auth=auth) -def build(branch_name: Optional[str], force: bool): - """Runs a build on the given branch. +def run_build( + branch_name: str, + commit_hash: str, +): + user_wants_to_build_commit = inquirer.confirm( + message=f"You are requesting to build '{branch_name}' (commit-hash: {commit_hash}) using the 'build-commit' workflow; proceed?" + ) + if not user_wants_to_build_commit: + print("Workflow aborted") + exit(1) - If the branch has already been built, it will reuse the already built wheel. - """ - s3 = boto3.client("s3") + repo = g.get_repo("Eventual-Inc/Daft") + workflow = repo.get_workflow("build-commit.yaml") - branch_name, commit_hash = get_name_and_commit_hash(branch_name) + pre_creation_latest_run = get_latest_run(workflow) + + inputs = {"arch": "x86"} + print(f"Launching new 'build-commit' workflow with the following inputs: {inputs}") + created = workflow.create_dispatch( + ref=branch_name, + inputs=inputs, + ) + if not created: + raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") + + post_creation_latest_run = None + for _ in range(RETRY_ATTEMPTS): + post_creation_latest_run = get_latest_run(workflow) + if pre_creation_latest_run.run_number == post_creation_latest_run.run_number: + sleep_and_then_retry() + elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number: + break + else: + typing.assert_never( + "Run numbers are always returned in sorted order and are always monotonically increasing" + ) + + if not post_creation_latest_run: + raise RuntimeError("Unable to locate the new run request for the 'build-commit' workflow") + + print(f"Latest 'build-commit' workflow run found with id: {post_creation_latest_run.id}") + print(f"View the workflow run at: {post_creation_latest_run.url}") + + while True: + jobs = repo.get_workflow_run(post_creation_latest_run.id).jobs() + if not jobs: + raise RuntimeError("The 'build-commit' workflow should have 1 job") + elif jobs.totalCount > 1: + raise RuntimeError("The 'build-commit' workflow should only have 1 job") + + build_commit_job: WorkflowJob = jobs[0] + if build_commit_job.conclusion: + break + else: + print(f"Job is still running with status: {build_commit_job.status}") + + sleep_and_then_retry(10) + + print(f"Job completed with status {build_commit_job.conclusion}") + + if build_commit_job.conclusion != "success": + raise RuntimeError(f"The 'build-commit' workflow failed; view the results here: {post_creation_latest_run.url}") + + wheel_url = find_wheel(commit_hash) + + if not wheel_url: + raise RuntimeError("The wheel was not able to be found in AWS S3; internal error") + + return wheel_url + + +def find_wheel(commit_hash: str) -> Optional[str]: + s3 = boto3.client("s3") response: dict = s3.list_objects_v2(Bucket="github-actions-artifacts-bucket", Prefix=f"builds/{commit_hash}/") wheel_urls = [] for wheel in response.get("Contents", []): @@ -83,75 +148,29 @@ def build(branch_name: Optional[str], force: bool): length = len(wheel_urls) assert length <= 1, "There should never be more than 1 object in S3 with the exact same key" - print(f"Checking if a build exists for the branch '{branch_name}' (commit-hash: {commit_hash})") - - def run_build(): - user_wants_to_build_commit = inquirer.confirm(message="No build found; would you like to build this branch?") - if not user_wants_to_build_commit: - print("Workflow aborted") - exit(1) - - repo = g.get_repo("Eventual-Inc/Daft") - workflow = repo.get_workflow("build-commit.yaml") - - pre_creation_latest_run = get_latest_run(workflow) - - inputs = {"arch": "x86"} - print(f"Launching new 'build-commit' workflow with the following inputs: {inputs}") - created = workflow.create_dispatch( - ref=branch_name, - inputs=inputs, - ) - if not created: - raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") - - post_creation_latest_run = None - for _ in range(RETRY_ATTEMPTS): - post_creation_latest_run = get_latest_run(workflow) - if pre_creation_latest_run.run_number == post_creation_latest_run.run_number: - sleep_and_then_retry() - elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number: - break - else: - typing.assert_never( - "Run numbers are always returned in sorted order and are always monotonically increasing" - ) - - if not post_creation_latest_run: - raise RuntimeError("Unable to locate the new run request for the 'build-commit' workflow") - - print(f"Latest 'build-commit' workflow run found with id: {post_creation_latest_run.id}") - print(f"View the workflow run at: {post_creation_latest_run.url}") + return wheel_urls[0] if wheel_urls else None - while True: - jobs = repo.get_workflow_run(post_creation_latest_run.id).jobs() - if not jobs: - raise RuntimeError("The 'build-commit' workflow should have 1 job") - elif len(jobs) > 1: - raise RuntimeError("The 'build-commit' workflow should only have 1 job") - build_commit_job: WorkflowJob = jobs[0] - if build_commit_job.conclusion: - break - else: - print(f"Job is still running with status: {build_commit_job.status}") +def build(branch_name: Optional[str], force: bool): + """Runs a build on the given branch. - sleep_and_then_retry(10) + If the branch has already been built, it will reuse the already built wheel. + """ + branch_name, commit_hash = get_name_and_commit_hash(branch_name) - print(f"Job completed with status {build_commit_job.conclusion}") + print(f"Checking if a build exists for the branch '{branch_name}' (commit-hash: {commit_hash})") - if build_commit_job.conclusion != "success": - raise RuntimeError( - f"The 'build-commit' workflow failed; view the results here: {post_creation_latest_run.url}" - ) + wheel_url = find_wheel(commit_hash) - if length == 0: - run_build() - elif length == 1: + if wheel_url: if force: - run_build() + wheel_url = run_build(branch_name, commit_hash) else: - print("Build found; re-using build") + print(f"Wheel already found at url {wheel_url}; re-using") + else: + wheel_url = run_build(branch_name, commit_hash) + + print(wheel_url) if __name__ == "__main__": From b9d2d0ffeaa96e3db13cb33a9177c6259cbb0c6a Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 17 Dec 2024 19:31:17 -0800 Subject: [PATCH 5/9] Add run section to tpcds benchmarking script --- tools/tpcds.py | 76 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 26 deletions(-) diff --git a/tools/tpcds.py b/tools/tpcds.py index 282e2e675b..407a4ed7b9 100644 --- a/tools/tpcds.py +++ b/tools/tpcds.py @@ -25,6 +25,20 @@ WHEEL_NAME = "getdaft-0.3.0.dev0-cp38-abi3-manylinux_2_31_x86_64.whl" RETRY_ATTEMPTS = 5 +auth = Auth.Token(gha_run_cluster_job.get_oauth_token()) +g = Github(auth=auth) +repo = g.get_repo("Eventual-Inc/Daft") + + +def dispatch(workflow: Workflow, branch_name: str, inputs: dict): + print(f"Launching workflow '{workflow.name}' on the branch '{branch_name}' with the inputs '{inputs}'") + created = workflow.create_dispatch( + ref=branch_name, + inputs=inputs, + ) + if not created: + raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") + def sleep_and_then_retry(sleep_amount_sec: int = 3): time.sleep(sleep_amount_sec) @@ -55,10 +69,6 @@ def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: return name, commit_hash -auth = Auth.Token(gha_run_cluster_job.get_oauth_token()) -g = Github(auth=auth) - - def run_build( branch_name: str, commit_hash: str, @@ -70,19 +80,11 @@ def run_build( print("Workflow aborted") exit(1) - repo = g.get_repo("Eventual-Inc/Daft") workflow = repo.get_workflow("build-commit.yaml") pre_creation_latest_run = get_latest_run(workflow) - inputs = {"arch": "x86"} - print(f"Launching new 'build-commit' workflow with the following inputs: {inputs}") - created = workflow.create_dispatch( - ref=branch_name, - inputs=inputs, - ) - if not created: - raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") + dispatch(workflow=workflow, branch_name=branch_name, inputs={"arch": "x86"}) post_creation_latest_run = None for _ in range(RETRY_ATTEMPTS): @@ -99,7 +101,7 @@ def run_build( if not post_creation_latest_run: raise RuntimeError("Unable to locate the new run request for the 'build-commit' workflow") - print(f"Latest 'build-commit' workflow run found with id: {post_creation_latest_run.id}") + print(f"Launched new 'build-commit' workflow with id: {post_creation_latest_run.id}") print(f"View the workflow run at: {post_creation_latest_run.url}") while True: @@ -139,7 +141,6 @@ def find_wheel(commit_hash: str) -> Optional[str]: if "Key" in wheel: wheel_path = Path(wheel["Key"]) wheel_name = wheel_path.name - print(wheel_name) if wheel_name == WHEEL_NAME: wheel_urls.append( f"https://github-actions-artifacts-bucket.s3.us-west-2.amazonaws.com/builds/{commit_hash}/{wheel_name}" @@ -151,26 +152,49 @@ def find_wheel(commit_hash: str) -> Optional[str]: return wheel_urls[0] if wheel_urls else None -def build(branch_name: Optional[str], force: bool): +def build(branch_name: str, commit_hash: str, force: bool) -> str: """Runs a build on the given branch. If the branch has already been built, it will reuse the already built wheel. """ - branch_name, commit_hash = get_name_and_commit_hash(branch_name) - print(f"Checking if a build exists for the branch '{branch_name}' (commit-hash: {commit_hash})") wheel_url = find_wheel(commit_hash) - if wheel_url: - if force: - wheel_url = run_build(branch_name, commit_hash) - else: - print(f"Wheel already found at url {wheel_url}; re-using") - else: + should_build = force or wheel_url is None + if should_build: wheel_url = run_build(branch_name, commit_hash) + else: + # wheel_url must be non-None if this branch is executed + print(f"Wheel already found at url {wheel_url}; re-using") + + return wheel_url + - print(wheel_url) +def run( + wheel_url: str, + branch_name: str, +): + workflow = repo.get_workflow("run-cluster.yaml") + dispatch( + workflow=workflow, + branch_name=branch_name, + inputs={ + "daft_wheel_url": wheel_url, + "working_dir": "benchmarking/tpcds", + "entrypoint_script": "ray_entrypoint.py", + "entrypoint_args": "--question=3 --scale-factor=100", + }, + ) + + +def main( + branch_name: Optional[str], + force: bool, +): + branch_name, commit_hash = get_name_and_commit_hash(branch_name) + wheel_url = build(branch_name=branch_name, commit_hash=commit_hash, force=force) + run(wheel_url=wheel_url, branch_name=branch_name) if __name__ == "__main__": @@ -183,7 +207,7 @@ def build(branch_name: Optional[str], force: bool): if args.verbose: enable_console_debug_logging() - build( + main( branch_name=args.ref, force=args.force, ) From e2ab1ab8ec3020e702327433cf2acabf639d0453 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Wed, 18 Dec 2024 11:17:10 -0800 Subject: [PATCH 6/9] Add printout --- tools/tpcds.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tools/tpcds.py b/tools/tpcds.py index 407a4ed7b9..c4cc42e938 100644 --- a/tools/tpcds.py +++ b/tools/tpcds.py @@ -174,7 +174,16 @@ def build(branch_name: str, commit_hash: str, force: bool) -> str: def run( wheel_url: str, branch_name: str, + commit_hash: str, ): + user_wants_to_run_tpcds_benchmarking = inquirer.confirm( + message=f"Going to run the 'run-cluster' workflow on the branch '{branch_name}' (commit-hash: {commit_hash}); proceed?" + ) + + if not user_wants_to_run_tpcds_benchmarking: + print("Workflow aborted") + exit(1) + workflow = repo.get_workflow("run-cluster.yaml") dispatch( workflow=workflow, @@ -194,7 +203,7 @@ def main( ): branch_name, commit_hash = get_name_and_commit_hash(branch_name) wheel_url = build(branch_name=branch_name, commit_hash=commit_hash, force=force) - run(wheel_url=wheel_url, branch_name=branch_name) + run(wheel_url=wheel_url, branch_name=branch_name, commit_hash=commit_hash) if __name__ == "__main__": From 0a2d1a99dd570da6757ae0365a70d666547374af Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Wed, 18 Dec 2024 11:44:21 -0800 Subject: [PATCH 7/9] Get rid of force flag --- tools/tpcds.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/tools/tpcds.py b/tools/tpcds.py index c4cc42e938..49818e7576 100644 --- a/tools/tpcds.py +++ b/tools/tpcds.py @@ -152,7 +152,7 @@ def find_wheel(commit_hash: str) -> Optional[str]: return wheel_urls[0] if wheel_urls else None -def build(branch_name: str, commit_hash: str, force: bool) -> str: +def build(branch_name: str, commit_hash: str) -> str: """Runs a build on the given branch. If the branch has already been built, it will reuse the already built wheel. @@ -161,12 +161,10 @@ def build(branch_name: str, commit_hash: str, force: bool) -> str: wheel_url = find_wheel(commit_hash) - should_build = force or wheel_url is None - if should_build: - wheel_url = run_build(branch_name, commit_hash) - else: - # wheel_url must be non-None if this branch is executed + if wheel_url: print(f"Wheel already found at url {wheel_url}; re-using") + else: + wheel_url = run_build(branch_name, commit_hash) return wheel_url @@ -199,17 +197,15 @@ def run( def main( branch_name: Optional[str], - force: bool, ): branch_name, commit_hash = get_name_and_commit_hash(branch_name) - wheel_url = build(branch_name=branch_name, commit_hash=commit_hash, force=force) + wheel_url = build(branch_name=branch_name, commit_hash=commit_hash) run(wheel_url=wheel_url, branch_name=branch_name, commit_hash=commit_hash) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") - parser.add_argument("--force", action="store_true", help="Force a rebuild") parser.add_argument("--verbose", action="store_true", help="Verbose debugging") args = parser.parse_args() @@ -218,5 +214,4 @@ def main( main( branch_name=args.ref, - force=args.force, ) From cdca94c66d91db96c797573b70b0b6b3d8e37937 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Wed, 18 Dec 2024 12:53:17 -0800 Subject: [PATCH 8/9] Change which url is being printed; update how commit-hashes are used --- tools/tpcds.py | 181 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 117 insertions(+), 64 deletions(-) diff --git a/tools/tpcds.py b/tools/tpcds.py index 49818e7576..b1f3d65b82 100644 --- a/tools/tpcds.py +++ b/tools/tpcds.py @@ -8,6 +8,7 @@ # /// import argparse +import json import subprocess import time import typing @@ -30,7 +31,9 @@ repo = g.get_repo("Eventual-Inc/Daft") -def dispatch(workflow: Workflow, branch_name: str, inputs: dict): +def dispatch(workflow: Workflow, branch_name: str, inputs: dict) -> WorkflowRun: + pre_creation_latest_run = get_latest_run(workflow) + print(f"Launching workflow '{workflow.name}' on the branch '{branch_name}' with the inputs '{inputs}'") created = workflow.create_dispatch( ref=branch_name, @@ -39,6 +42,25 @@ def dispatch(workflow: Workflow, branch_name: str, inputs: dict): if not created: raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") + post_creation_latest_run = None + for _ in range(RETRY_ATTEMPTS): + post_creation_latest_run = get_latest_run(workflow) + if pre_creation_latest_run.run_number == post_creation_latest_run.run_number: + sleep_and_then_retry() + elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number: + break + else: + typing.assert_never( + "Run numbers are always returned in sorted order and are always monotonically increasing" + ) + if not post_creation_latest_run: + raise RuntimeError("Unable to locate the new run request for the 'build-commit' workflow") + + print(f"Launched new '{workflow.name}' workflow with id: {post_creation_latest_run.id}") + print(f"View the workflow run at: {post_creation_latest_run.html_url}") + + return post_creation_latest_run + def sleep_and_then_retry(sleep_amount_sec: int = 3): time.sleep(sleep_amount_sec) @@ -69,10 +91,40 @@ def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: return name, commit_hash -def run_build( - branch_name: str, - commit_hash: str, -): +def find_wheel(commit_hash: str) -> Optional[str]: + s3 = boto3.client("s3") + response: dict = s3.list_objects_v2(Bucket="github-actions-artifacts-bucket", Prefix=f"builds/{commit_hash}/") + wheel_urls = [] + for wheel in response.get("Contents", []): + wheel: dict + if "Key" in wheel: + wheel_path = Path(wheel["Key"]) + wheel_name = wheel_path.name + if wheel_name == WHEEL_NAME: + wheel_urls.append( + f"https://github-actions-artifacts-bucket.s3.us-west-2.amazonaws.com/builds/{commit_hash}/{wheel_name}" + ) + + length = len(wheel_urls) + assert length <= 1, "There should never be more than 1 object in S3 with the exact same key" + + return wheel_urls[0] if wheel_urls else None + + +def build(branch_name: str, commit_hash: str) -> tuple[str, str]: + """Runs a build on the given branch. + + If the branch has already been built, it will reuse the already built wheel. + """ + print(f"Checking if a build exists for the branch '{branch_name}' (commit-hash: {commit_hash})") + + wheel_url = find_wheel(commit_hash) + + if wheel_url: + print(f"Wheel already found at url {wheel_url}; re-using") + return wheel_url, commit_hash + + print(f"No wheel found for branch '{branch_name}'; attempting to build") user_wants_to_build_commit = inquirer.confirm( message=f"You are requesting to build '{branch_name}' (commit-hash: {commit_hash}) using the 'build-commit' workflow; proceed?" ) @@ -81,31 +133,17 @@ def run_build( exit(1) workflow = repo.get_workflow("build-commit.yaml") + latest_run = dispatch(workflow=workflow, branch_name=branch_name, inputs={"arch": "x86"}) - pre_creation_latest_run = get_latest_run(workflow) - - dispatch(workflow=workflow, branch_name=branch_name, inputs={"arch": "x86"}) - - post_creation_latest_run = None - for _ in range(RETRY_ATTEMPTS): - post_creation_latest_run = get_latest_run(workflow) - if pre_creation_latest_run.run_number == post_creation_latest_run.run_number: - sleep_and_then_retry() - elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number: - break - else: - typing.assert_never( - "Run numbers are always returned in sorted order and are always monotonically increasing" - ) - - if not post_creation_latest_run: - raise RuntimeError("Unable to locate the new run request for the 'build-commit' workflow") - - print(f"Launched new 'build-commit' workflow with id: {post_creation_latest_run.id}") - print(f"View the workflow run at: {post_creation_latest_run.url}") + actual_commit_hash = latest_run.head_sha + if actual_commit_hash != commit_hash: + print( + f"Looks like your current branch and remote branch are out of sync (one is behind the other);\nThe workflow has been launched on the commit hash that GitHub has: '{actual_commit_hash}'", + ) + commit_hash = actual_commit_hash while True: - jobs = repo.get_workflow_run(post_creation_latest_run.id).jobs() + jobs = repo.get_workflow_run(latest_run.id).jobs() if not jobs: raise RuntimeError("The 'build-commit' workflow should have 1 job") elif jobs.totalCount > 1: @@ -114,65 +152,62 @@ def run_build( build_commit_job: WorkflowJob = jobs[0] if build_commit_job.conclusion: break - else: - print(f"Job is still running with status: {build_commit_job.status}") + print(f"Job is still running with status: {build_commit_job.status}") sleep_and_then_retry(10) print(f"Job completed with status {build_commit_job.conclusion}") if build_commit_job.conclusion != "success": - raise RuntimeError(f"The 'build-commit' workflow failed; view the results here: {post_creation_latest_run.url}") + raise RuntimeError(f"The 'build-commit' workflow failed; view the results here: {latest_run.url}") wheel_url = find_wheel(commit_hash) if not wheel_url: raise RuntimeError("The wheel was not able to be found in AWS S3; internal error") - return wheel_url - - -def find_wheel(commit_hash: str) -> Optional[str]: - s3 = boto3.client("s3") - response: dict = s3.list_objects_v2(Bucket="github-actions-artifacts-bucket", Prefix=f"builds/{commit_hash}/") - wheel_urls = [] - for wheel in response.get("Contents", []): - wheel: dict - if "Key" in wheel: - wheel_path = Path(wheel["Key"]) - wheel_name = wheel_path.name - if wheel_name == WHEEL_NAME: - wheel_urls.append( - f"https://github-actions-artifacts-bucket.s3.us-west-2.amazonaws.com/builds/{commit_hash}/{wheel_name}" - ) - - length = len(wheel_urls) - assert length <= 1, "There should never be more than 1 object in S3 with the exact same key" + return wheel_url, commit_hash - return wheel_urls[0] if wheel_urls else None +def parse_questions(questions: str) -> list[int]: + if not questions: + return [] -def build(branch_name: str, commit_hash: str) -> str: - """Runs a build on the given branch. + if questions == "*": + return list(range(1, 100)) - If the branch has already been built, it will reuse the already built wheel. - """ - print(f"Checking if a build exists for the branch '{branch_name}' (commit-hash: {commit_hash})") + items = questions.split(",") + nums = [] + for item in items: + try: + num = int(item) + nums.push(str(num)) + continue + except ValueError: + ... - wheel_url = find_wheel(commit_hash) + if "-" not in item: + raise ValueError("...") - if wheel_url: - print(f"Wheel already found at url {wheel_url}; re-using") - else: - wheel_url = run_build(branch_name, commit_hash) + try: + lower, upper = item.split("-") + lower_int = int(lower) + upper_int = int(upper) + if lower_int > upper_int: + raise ValueError + nums.extend(range(lower_int, upper_int + 1)) + except ValueError: + raise ValueError(f"Invalid question item; expected a number or a range, instead got {item}") - return wheel_url + return nums def run( wheel_url: str, branch_name: str, commit_hash: str, + questions: str, + scale_factor: int, ): user_wants_to_run_tpcds_benchmarking = inquirer.confirm( message=f"Going to run the 'run-cluster' workflow on the branch '{branch_name}' (commit-hash: {commit_hash}); proceed?" @@ -182,6 +217,10 @@ def run( print("Workflow aborted") exit(1) + expanded_questions = parse_questions(questions) + args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions] + entrypoint_args = json.dumps(args_as_list) + workflow = repo.get_workflow("run-cluster.yaml") dispatch( workflow=workflow, @@ -190,22 +229,34 @@ def run( "daft_wheel_url": wheel_url, "working_dir": "benchmarking/tpcds", "entrypoint_script": "ray_entrypoint.py", - "entrypoint_args": "--question=3 --scale-factor=100", + "entrypoint_args": entrypoint_args, }, ) def main( branch_name: Optional[str], + questions: str, + scale_factor: int, ): branch_name, commit_hash = get_name_and_commit_hash(branch_name) - wheel_url = build(branch_name=branch_name, commit_hash=commit_hash) - run(wheel_url=wheel_url, branch_name=branch_name, commit_hash=commit_hash) + wheel_url, commit_hash = build(branch_name=branch_name, commit_hash=commit_hash) + run( + wheel_url=wheel_url, + branch_name=branch_name, + commit_hash=commit_hash, + questions=questions, + scale_factor=scale_factor, + ) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") + parser.add_argument( + "--questions", type=str, required=False, default="*", help="A comma separated list of questions to run" + ) + parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on") parser.add_argument("--verbose", action="store_true", help="Verbose debugging") args = parser.parse_args() @@ -214,4 +265,6 @@ def main( main( branch_name=args.ref, + questions=args.questions, + scale_factor=args.scale_factor, ) From 9fba5d2c6406b632fadcc5b49b1dd56a947550a9 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Wed, 18 Dec 2024 13:20:19 -0800 Subject: [PATCH 9/9] Remove timeout --- .github/workflows/run-cluster.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-cluster.yaml b/.github/workflows/run-cluster.yaml index e0262be2cb..104076ec36 100644 --- a/.github/workflows/run-cluster.yaml +++ b/.github/workflows/run-cluster.yaml @@ -47,7 +47,7 @@ on: jobs: run-command: runs-on: [self-hosted, linux, x64, ci-dev] - timeout-minutes: 15 # Remove for ssh debugging + # timeout-minutes: 15 # Remove for ssh debugging permissions: id-token: write contents: read