From 993c3ecd907e1ede8edab0c9f056312500b74cd5 Mon Sep 17 00:00:00 2001 From: Jiri Petrlik Date: Tue, 19 Nov 2024 12:55:32 +0100 Subject: [PATCH] RHOAIENG-12192 - Extend DSP e2e tests --- .../v2/cache-disabled/ray_job_integration.py | 404 ++++++++++++++++++ .../ray_job_integration_compiled.yaml | 79 ++++ .../test-pipelines-integration.robot | 131 ++++++ .../1101__data-science-pipelines-kfp.robot | 21 - 4 files changed, 614 insertions(+), 21 deletions(-) create mode 100644 ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration.py create mode 100644 ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration_compiled.yaml create mode 100644 ods_ci/tests/Tests/0600__distributed_workloads/0601__workloads_orchestration/test-pipelines-integration.robot diff --git a/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration.py b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration.py new file mode 100644 index 000000000..2569a0caf --- /dev/null +++ b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration.py @@ -0,0 +1,404 @@ +from kfp import compiler, dsl +import subprocess +import tempfile + +common_base_image = ( + "registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61" +) + +training_script = """ +import os + +import torch +import requests +from pytorch_lightning import LightningModule, Trainer +from pytorch_lightning.callbacks.progress import TQDMProgressBar +from torch import nn +from torch.nn import functional as F +from torch.utils.data import DataLoader, random_split, RandomSampler +from torchmetrics import Accuracy +from torchvision import transforms +from torchvision.datasets import MNIST +import gzip +import shutil +from minio import Minio + + +PATH_DATASETS = os.environ.get("PATH_DATASETS", ".") +BATCH_SIZE = 256 if torch.cuda.is_available() else 64 + +local_mnist_path = os.path.dirname(os.path.abspath(__file__)) + +print("prior to running the trainer") +print("MASTER_ADDR: is ", os.getenv("MASTER_ADDR")) +print("MASTER_PORT: is ", os.getenv("MASTER_PORT")) + +print("ACCELERATOR: is ", os.getenv("ACCELERATOR")) +ACCELERATOR = os.getenv("ACCELERATOR") + +STORAGE_BUCKET_EXISTS = "AWS_DEFAULT_ENDPOINT" in os.environ +print("STORAGE_BUCKET_EXISTS: ", STORAGE_BUCKET_EXISTS) + +print( + f'Storage_Bucket_Default_Endpoint : is {os.environ.get("AWS_DEFAULT_ENDPOINT")}' + if "AWS_DEFAULT_ENDPOINT" in os.environ + else "" +) +print( + f'Storage_Bucket_Name : is {os.environ.get("AWS_STORAGE_BUCKET")}' + if "AWS_STORAGE_BUCKET" in os.environ + else "" +) +print( + f'Storage_Bucket_Mnist_Directory : is {os.environ.get("AWS_STORAGE_BUCKET_MNIST_DIR")}' + if "AWS_STORAGE_BUCKET_MNIST_DIR" in os.environ + else "" +) + + +class LitMNIST(LightningModule): + def __init__(self, data_dir=PATH_DATASETS, hidden_size=64, learning_rate=2e-4): + super().__init__() + + # Set our init args as class attributes + self.data_dir = data_dir + self.hidden_size = hidden_size + self.learning_rate = learning_rate + + # Hardcode some dataset specific attributes + self.num_classes = 10 + self.dims = (1, 28, 28) + channels, width, height = self.dims + self.transform = transforms.Compose( + [ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)), + ] + ) + + # Define PyTorch model + self.model = nn.Sequential( + nn.Flatten(), + nn.Linear(channels * width * height, hidden_size), + nn.ReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_size, hidden_size), + nn.ReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_size, self.num_classes), + ) + + self.val_accuracy = Accuracy() + self.test_accuracy = Accuracy() + + def forward(self, x): + x = self.model(x) + return F.log_softmax(x, dim=1) + + def training_step(self, batch, batch_idx): + x, y = batch + logits = self(x) + loss = F.nll_loss(logits, y) + return loss + + def validation_step(self, batch, batch_idx): + x, y = batch + logits = self(x) + loss = F.nll_loss(logits, y) + preds = torch.argmax(logits, dim=1) + self.val_accuracy.update(preds, y) + + # Calling self.log will surface up scalars for you in TensorBoard + self.log("val_loss", loss, prog_bar=True) + self.log("val_acc", self.val_accuracy, prog_bar=True) + + def test_step(self, batch, batch_idx): + x, y = batch + logits = self(x) + loss = F.nll_loss(logits, y) + preds = torch.argmax(logits, dim=1) + self.test_accuracy.update(preds, y) + + # Calling self.log will surface up scalars for you in TensorBoard + self.log("test_loss", loss, prog_bar=True) + self.log("test_acc", self.test_accuracy, prog_bar=True) + + def configure_optimizers(self): + optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate) + return optimizer + + #################### + # DATA RELATED HOOKS + #################### + + def prepare_data(self): + # download + print("Downloading MNIST dataset...") + + if ( + STORAGE_BUCKET_EXISTS + and os.environ.get("AWS_DEFAULT_ENDPOINT") != "" + and os.environ.get("AWS_DEFAULT_ENDPOINT") != None + ): + print("Using storage bucket to download datasets...") + + dataset_dir = os.path.join(self.data_dir, "MNIST/raw") + endpoint = os.environ.get("AWS_DEFAULT_ENDPOINT") + access_key = os.environ.get("AWS_ACCESS_KEY_ID") + secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + bucket_name = os.environ.get("AWS_STORAGE_BUCKET") + + # remove prefix if specified in storage bucket endpoint url + secure = True + if endpoint.startswith("https://"): + endpoint = endpoint[len("https://") :] + elif endpoint.startswith("http://"): + endpoint = endpoint[len("http://") :] + secure = False + + client = Minio( + endpoint, + access_key=access_key, + secret_key=secret_key, + cert_check=False, + secure=secure, + ) + + if not os.path.exists(dataset_dir): + os.makedirs(dataset_dir) + else: + print(f"Directory '{dataset_dir}' already exists") + + # To download datasets from storage bucket's specific directory, use prefix to provide directory name + prefix = os.environ.get("AWS_STORAGE_BUCKET_MNIST_DIR") + # download all files from prefix folder of storage bucket recursively + for item in client.list_objects(bucket_name, prefix=prefix, recursive=True): + file_name = item.object_name[len(prefix) + 1 :] + dataset_file_path = os.path.join(dataset_dir, file_name) + if not os.path.exists(dataset_file_path): + client.fget_object(bucket_name, item.object_name, dataset_file_path) + else: + print(f"File-path '{dataset_file_path}' already exists") + # Unzip files + with gzip.open(dataset_file_path, "rb") as f_in: + with open(dataset_file_path.split(".")[:-1][0], "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + # delete zip file + os.remove(dataset_file_path) + unzipped_filepath = dataset_file_path.split(".")[0] + if os.path.exists(unzipped_filepath): + print( + f"Unzipped and saved dataset file to path - {unzipped_filepath}" + ) + download_datasets = False + + else: + print("Using default MNIST mirror reference to download datasets...") + download_datasets = True + + MNIST(self.data_dir, train=True, download=download_datasets) + MNIST(self.data_dir, train=False, download=download_datasets) + + def setup(self, stage=None): + # Assign train/val datasets for use in dataloaders + if stage == "fit" or stage is None: + mnist_full = MNIST( + self.data_dir, train=True, transform=self.transform, download=False + ) + self.mnist_train, self.mnist_val = random_split(mnist_full, [55000, 5000]) + + # Assign test dataset for use in dataloader(s) + if stage == "test" or stage is None: + self.mnist_test = MNIST( + self.data_dir, train=False, transform=self.transform, download=False + ) + + def train_dataloader(self): + return DataLoader( + self.mnist_train, + batch_size=BATCH_SIZE, + sampler=RandomSampler(self.mnist_train, num_samples=1000), + ) + + def val_dataloader(self): + return DataLoader(self.mnist_val, batch_size=BATCH_SIZE) + + def test_dataloader(self): + return DataLoader(self.mnist_test, batch_size=BATCH_SIZE) + + +# Init DataLoader from MNIST Dataset + +model = LitMNIST(data_dir=local_mnist_path) + +print("GROUP: ", int(os.environ.get("GROUP_WORLD_SIZE", 1))) +print("LOCAL: ", int(os.environ.get("LOCAL_WORLD_SIZE", 1))) + +# Initialize a trainer +trainer = Trainer( + accelerator=ACCELERATOR, + # devices=1 if torch.cuda.is_available() else None, # limiting got iPython runs + max_epochs=3, + callbacks=[TQDMProgressBar(refresh_rate=20)], + num_nodes=int(os.environ.get("GROUP_WORLD_SIZE", 1)), + devices=int(os.environ.get("LOCAL_WORLD_SIZE", 1)), + replace_sampler_ddp=False, + strategy="ddp", +) + +# Train the model ⚡ +trainer.fit(model) +""" + +pip_requirements = """ +pytorch_lightning==1.9.5 +torchmetrics==0.9.1 +torchvision==0.20.1 +minio +""" + +# image and the sdk has a fixed value because the version matters +@dsl.component(packages_to_install=["codeflare-sdk==0.21.1"], base_image=common_base_image) +def ray_fn( + AWS_DEFAULT_ENDPOINT, + AWS_STORAGE_BUCKET, + AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY, + AWS_STORAGE_BUCKET_MNIST_DIR +) -> int: + import ray # noqa: PLC0415 + from codeflare_sdk import generate_cert # noqa: PLC0415 + from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration # noqa: PLC0415 + + cluster = Cluster( + ClusterConfiguration( + name="raytest", + num_workers=1, + head_cpu_requests=1, + head_cpu_limits=1, + head_memory_requests=4, + head_memory_limits=4, + worker_cpu_requests=1, + worker_cpu_limits=1, + worker_memory_requests=1, + worker_memory_limits=2, + image="quay.io/modh/ray:2.35.0-py39-cu121", + verify_tls=False + ) + ) + + # always clean the resources + cluster.down() + print(cluster.status()) + cluster.up() + cluster.wait_ready() + print(cluster.status()) + print(cluster.details()) + + ray_dashboard_uri = cluster.cluster_dashboard_uri() + ray_cluster_uri = cluster.cluster_uri() + print(ray_dashboard_uri) + print(ray_cluster_uri) + + # before proceeding make sure the cluster exists and the uri is not empty + assert ray_cluster_uri, "Ray cluster needs to be started and set before proceeding" + assert ray_dashboard_uri, "Ray dashboard needs to be started and set before proceeding" + + mnist_directory = tempfile.mkdtemp(prefix="mnist-dir") + with open(mnist_directory + "/mnist.py", "w") as mnist_file: + mnist_file.write(training_script) + with open(mnist_directory + "/mnist_pip_requirements.txt") as pip_requirements_file: + pip_requirements_file.write(pip_requirements) + + assert_jobsubmit_withlogin(self, cluster, mnist_directory) + + cluster.down() + return result + + def assert_jobsubmit_withlogin(self, cluster, mnist_directory): + auth_token = run_oc_command(["whoami", "--show-token=true"]) + ray_dashboard = cluster.cluster_dashboard_uri() + header = {"Authorization": f"Bearer {auth_token}"} + client = RayJobClient(address=ray_dashboard, headers=header, verify=False) + + submission_id = client.submit_job( + entrypoint="python mnist.py", + runtime_env={ + "working_dir": mnist_directory, + "pip": mnist_directory + "/mnist_pip_requirements.txt", + "env_vars": { + "AWS_DEFAULT_ENDPOINT": AWS_DEFAULT_ENDPOINT, + "AWS_STORAGE_BUCKET": AWS_STORAGE_BUCKET, + "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID, + "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY, + "AWS_STORAGE_BUCKET_MNIST_DIR": AWS_STORAGE_BUCKET_MNIST_DIR + }, + }, + entrypoint_num_cpus=1, + ) + print(f"Submitted job with ID: {submission_id}") + done = False + time = 0 + timeout = 900 + while not done: + status = client.get_job_status(submission_id) + if status.is_terminal(): + break + if not done: + print(status) + if timeout and time >= timeout: + raise TimeoutError(f"job has timed out after waiting {timeout}s") + sleep(5) + time += 5 + + logs = client.get_job_logs(submission_id) + print(logs) + + self.assert_job_completion(status) + + client.delete_job(submission_id) + + cluster.down() + + def assert_job_completion(self, status): + if status == "SUCCEEDED": + print(f"Job has completed: '{status}'") + assert True + else: + print(f"Job has completed: '{status}'") + assert False + + def run_oc_command(args): + try: + result = subprocess.run( + ["oc"] + args, capture_output=True, text=True, check=True + ) + return result.stdout.strip() + except subprocess.CalledProcessError as e: + print(f"Error executing 'oc {' '.join(args)}': {e}") + return None + + +@dsl.pipeline( + name="Ray Integration Test", + description="Ray Integration Test", +) + +def ray_job_integration( + AWS_DEFAULT_ENDPOINT, + AWS_STORAGE_BUCKET, + AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY, + AWS_STORAGE_BUCKET_MNIST_DIR +): + ray_fn( + AWS_DEFAULT_ENDPOINT, + AWS_STORAGE_BUCKET, + AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY, + AWS_STORAGE_BUCKET_MNIST_DIR + ).set_caching_options(False) + + +if __name__ == "__main__": + compiler.Compiler().compile(ray_job_integration, package_path=__file__.replace(".py", "_compiled.yaml")) diff --git a/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration_compiled.yaml b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration_compiled.yaml new file mode 100644 index 000000000..259f445a5 --- /dev/null +++ b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration_compiled.yaml @@ -0,0 +1,79 @@ +# PIPELINE DEFINITION +# Name: ray-integration-test +# Description: Ray Integration Test +components: + comp-ray-fn: + executorLabel: exec-ray-fn + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_INTEGER +deploymentSpec: + executors: + exec-ray-fn: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - ray_fn + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'codeflare-sdk==0.21.1'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef ray_fn() -> int:\n import ray # noqa: PLC0415\n from codeflare_sdk\ + \ import generate_cert # noqa: PLC0415\n from codeflare_sdk.cluster.cluster\ + \ import Cluster, ClusterConfiguration # noqa: PLC0415\n\n cluster =\ + \ Cluster(\n ClusterConfiguration(\n name=\"raytest\"\ + ,\n num_workers=1,\n head_cpu_requests=1,\n \ + \ head_cpu_limits=1,\n head_memory_requests=4,\n \ + \ head_memory_limits=4,\n worker_cpu_requests=1,\n \ + \ worker_cpu_limits=1,\n worker_memory_requests=1,\n \ + \ worker_memory_limits=2,\n image=\"quay.io/modh/ray:2.35.0-py39-cu121\"\ + ,\n verify_tls=False\n )\n )\n\n # always clean\ + \ the resources\n cluster.down()\n print(cluster.status())\n cluster.up()\n\ + \ cluster.wait_ready()\n print(cluster.status())\n print(cluster.details())\n\ + \n ray_dashboard_uri = cluster.cluster_dashboard_uri()\n ray_cluster_uri\ + \ = cluster.cluster_uri()\n print(ray_dashboard_uri)\n print(ray_cluster_uri)\n\ + \n # before proceeding make sure the cluster exists and the uri is not\ + \ empty\n assert ray_cluster_uri, \"Ray cluster needs to be started and\ + \ set before proceeding\"\n\n # reset the ray context in case there's\ + \ already one.\n ray.shutdown()\n # establish connection to ray cluster\n\ + \ generate_cert.generate_tls_cert(cluster.config.name, cluster.config.namespace)\n\ + \ generate_cert.export_env(cluster.config.name, cluster.config.namespace)\n\ + \ ray.init(address=cluster.cluster_uri(), logging_level=\"DEBUG\")\n\ + \ print(\"Ray cluster is up and running: \", ray.is_initialized())\n\n\ + \ @ray.remote\n def train_fn():\n return 100\n\n result\ + \ = ray.get(train_fn.remote())\n assert 100 == result\n ray.shutdown()\n\ + \ cluster.down()\n return result\n\n" + image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61 +pipelineInfo: + description: Ray Integration Test + name: ray-integration-test +root: + dag: + tasks: + ray-fn: + cachingOptions: {} + componentRef: + name: comp-ray-fn + taskInfo: + name: ray-fn +schemaVersion: 2.1.0 +sdkVersion: kfp-2.10.0 diff --git a/ods_ci/tests/Tests/0600__distributed_workloads/0601__workloads_orchestration/test-pipelines-integration.robot b/ods_ci/tests/Tests/0600__distributed_workloads/0601__workloads_orchestration/test-pipelines-integration.robot new file mode 100644 index 000000000..ecf567135 --- /dev/null +++ b/ods_ci/tests/Tests/0600__distributed_workloads/0601__workloads_orchestration/test-pipelines-integration.robot @@ -0,0 +1,131 @@ +*** Settings *** +Documentation Test suite for OpenShift Pipeline using kfp python package + +Resource ../../../Resources/RHOSi.resource +Resource ../../../Resources/ODS.robot +Resource ../../../Resources/Common.robot +Resource ../../../Resources/Page/ODH/ODHDashboard/ODHDashboard.robot +Resource ../../../Resources/Page/ODH/ODHDashboard/ODHDataSciencePipelines.resource +Resource ../../../Resources/Page/ODH/ODHDashboard/ODHDataScienceProject/Permissions.resource +Resource ../../../Resources/Page/ODH/ODHDashboard/ODHDataScienceProject/Projects.resource +Resource ../../../Resources/CLI/DataSciencePipelines/DataSciencePipelinesBackend.resource +Resource ../../../Resources/Page/DistributedWorkloads/DistributedWorkloads.resource +Library DateTime +Library ../../../../libs/DataSciencePipelinesAPI.py +Library ../../../../libs/DataSciencePipelinesKfp.py +Test Tags DataSciencePipelines-Backend +Suite Setup Data Science Pipelines Suite Setup +Suite Teardown RHOSi Teardown + + +*** Variables *** +${PROJECT_NAME}= dw-pipelines +${KUEUE_RESOURCES_SETUP_FILEPATH}= tests/Resources/Page/DistributedWorkloads/kueue_resources_setup.sh + + +*** Test Cases *** +Verify Ods Users Can Create And Run A Data Science Pipeline With Ray Using The kfp Python Package + [Documentation] Creates, runs pipelines with regular user. Double check the pipeline result and clean + ... the pipeline resources. + ... AutomationBugOnDisconnected: RHOAIENG-12514 + [Tags] Tier1 + ... AutomationBugOnDisconnected + ... DistributedWorkloads + ... WorkloadsOrchestration + ... DataSciencePipelines-DistributedWorkloads + Skip If Component Is Not Enabled ray + Skip If Component Is Not Enabled codeflare + ${params_dict}= Create Dictionary + ... AWS_DEFAULT_ENDPOINT=${AWS_DEFAULT_ENDPOINT} + ... AWS_STORAGE_BUCKET=${AWS_STORAGE_BUCKET} + ... AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + ... AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + ... AWS_STORAGE_BUCKET_MNIST_DIR=${AWS_STORAGE_BUCKET_MNIST_DIR} + End To End Pipeline Workflow Using Kfp + ... admin_username=${TEST_USER.USERNAME} + ... admin_password=${TEST_USER.PASSWORD} + ... username=${TEST_USER_3.USERNAME} + ... password=${TEST_USER_3.PASSWORD} + ... project=${PROJECT_NAME} + ... python_file=cache-disabled/ray_integration.py + ... method_name=ray_integration + ... status_check_timeout=600 + ... pipeline_params=${params_dict} + ... ray=${TRUE} + [Teardown] Projects.Delete Project Via CLI By Display Name ${PROJECT_NAME} + +Verify Ods Users Can Create And Run A Data Science Pipeline With Ray Job Using The kfp Python Package + [Documentation] Creates, runs pipelines with regular user. Double check the pipeline result and clean + ... the pipeline resources. + ... AutomationBugOnDisconnected: RHOAIENG-12514 + [Tags] Tier1 + ... AutomationBugOnDisconnected + ... DistributedWorkloads + ... WorkloadsOrchestration + ... DataSciencePipelines-DistributedWorkloads + Skip If Component Is Not Enabled ray + Skip If Component Is Not Enabled codeflare + ${ray_dict}= Create Dictionary + End To End Pipeline Workflow Using Kfp + ... admin_username=${TEST_USER.USERNAME} + ... admin_password=${TEST_USER.PASSWORD} + ... username=${TEST_USER_3.USERNAME} + ... password=${TEST_USER_3.PASSWORD} + ... project=${PROJECT_NAME} + ... python_file=cache-disabled/ray_job_integration.py + ... method_name=ray_job_integration + ... status_check_timeout=600 + ... pipeline_params=${ray_dict} + ... ray=${TRUE} + [Teardown] Projects.Delete Project Via CLI By Display Name ${PROJECT_NAME} + + +*** Keywords *** +# robocop: disable:line-too-long +End To End Pipeline Workflow Using Kfp + [Documentation] Create, run and double check the pipeline result using Kfp python package. In the end, + ... clean the pipeline resources. + [Arguments] ${username} ${password} ${admin_username} ${admin_password} ${project} ${python_file} + ... ${method_name} ${pipeline_params} ${status_check_timeout}=160 ${ray}=${FALSE} + + Projects.Delete Project Via CLI By Display Name ${project} + Projects.Create Data Science Project From CLI name=${project} + + DataSciencePipelinesBackend.Create PipelineServer Using Custom DSPA ${project} + + ${status} Login And Wait Dsp Route ${admin_username} ${admin_password} ${project} + Should Be True ${status} == 200 Could not login to the Data Science Pipelines Rest API OR DSP routing is not working + # we remove and add a new project for sanity. LocalQueue is per namespace + IF ${ray} == ${TRUE} + Setup Kueue Resources ${project} cluster-queue-user resource-flavor-user local-queue-user + END + # The run_robot_test.sh is sending the --variablefile ${TEST_VARIABLES_FILE} which may contain the `PIP_INDEX_URL` + # and `PIP_TRUSTED_HOST` variables, e.g. for disconnected testing. + Launch Data Science Project Main Page username=${admin_username} password=${admin_password} + Assign Contributor Permissions To User ${username} in Project ${project} + ${pip_index_url} = Get Variable Value ${PIP_INDEX_URL} ${NONE} + ${pip_trusted_host} = Get Variable Value ${PIP_TRUSTED_HOST} ${NONE} + Log pip_index_url = ${pip_index_url} / pip_trusted_host = ${pip_trusted_host} + ${run_id} Create Run From Pipeline Func ${username} ${password} ${project} + ... ${python_file} ${method_name} pipeline_params=${pipeline_params} pip_index_url=${pip_index_url} + ... pip_trusted_host=${pip_trusted_host} + ${run_status} Check Run Status ${run_id} timeout=${status_check_timeout} + Should Be Equal As Strings ${run_status} SUCCEEDED Pipeline run doesn't have a status that means success. Check the logs + +Data Science Pipelines Suite Setup + [Documentation] Data Science Pipelines Suite Setup + Set Library Search Order SeleniumLibrary + RHOSi Setup + +Setup Kueue Resources + [Documentation] Setup the kueue resources for the project + [Arguments] ${project_name} ${cluster_queue_name} ${resource_flavor_name} ${local_queue_name} + # Easy for debug + Log sh ${KUEUE_RESOURCES_SETUP_FILEPATH} ${cluster_queue_name} ${resource_flavor_name} ${local_queue_name} ${project_name} "2" "8" + ${result} = Run Process sh ${KUEUE_RESOURCES_SETUP_FILEPATH} ${cluster_queue_name} ${resource_flavor_name} ${local_queue_name} ${project_name} "2" "8" + ... shell=true + ... stderr=STDOUT + Log ${result.stdout} + IF ${result.rc} != 0 + FAIL Failed to setup kueue resources + END diff --git a/ods_ci/tests/Tests/1100__data_science_pipelines/1101__data-science-pipelines-kfp.robot b/ods_ci/tests/Tests/1100__data_science_pipelines/1101__data-science-pipelines-kfp.robot index ed16dcad7..455bb3c62 100644 --- a/ods_ci/tests/Tests/1100__data_science_pipelines/1101__data-science-pipelines-kfp.robot +++ b/ods_ci/tests/Tests/1100__data_science_pipelines/1101__data-science-pipelines-kfp.robot @@ -76,27 +76,6 @@ Verify Upload Download In Data Science Pipelines Using The kfp Python Package ... pipeline_params=${upload_download_dict} [Teardown] Projects.Delete Project Via CLI By Display Name ${PROJECT_NAME} -Verify Ods Users Can Create And Run A Data Science Pipeline With Ray Using The kfp Python Package - [Documentation] Creates, runs pipelines with regular user. Double check the pipeline result and clean - ... the pipeline resources. - ... AutomationBugOnDisconnected: RHOAIENG-12514 - [Tags] Tier1 AutomationBugOnDisconnected - Skip If Component Is Not Enabled ray - Skip If Component Is Not Enabled codeflare - ${ray_dict}= Create Dictionary - End To End Pipeline Workflow Using Kfp - ... admin_username=${TEST_USER.USERNAME} - ... admin_password=${TEST_USER.PASSWORD} - ... username=${TEST_USER_3.USERNAME} - ... password=${TEST_USER_3.PASSWORD} - ... project=${PROJECT_NAME} - ... python_file=cache-disabled/ray_integration.py - ... method_name=ray_integration - ... status_check_timeout=600 - ... pipeline_params=${ray_dict} - ... ray=${TRUE} - [Teardown] Projects.Delete Project Via CLI By Display Name ${PROJECT_NAME} - *** Keywords *** # robocop: disable:line-too-long