From 7c906a523c3b2e1dc8d35b60ab9e8aa1235e0f3f Mon Sep 17 00:00:00 2001 From: jill Date: Thu, 12 Oct 2023 19:01:47 +0800 Subject: [PATCH 1/4] add additional documentation for the with_overrides feature Signed-off-by: jill --- .../productionizing/customizing_resources.py | 53 ++++++++++++------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/examples/productionizing/productionizing/customizing_resources.py b/examples/productionizing/productionizing/customizing_resources.py index 1484ca880..68404e0a8 100644 --- a/examples/productionizing/productionizing/customizing_resources.py +++ b/examples/productionizing/productionizing/customizing_resources.py @@ -13,13 +13,15 @@ # Large datasets may not be able to run locally, so we would want to provide hints to the Flyte backend to request for more memory. # This is done by decorating the task with the hints as shown in the following code sample. # -# Tasks can have `requests` and `limits` which mirror the native [equivalents in Kubernetes](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits). +# Tasks can have `task_config` which provides configuration for a specific task types, or `requests` and `limits` which mirror the native [equivalents in Kubernetes](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits). # A task can possibly be allocated more resources than it requests, but never more than its limit. # Requests are treated as hints to schedule tasks on nodes with available resources, whereas limits # are hard constraints. # # For either a request or limit, refer to the {py:class}`flytekit:flytekit.Resources` documentation. # +# And for task_config, refer to the {py:func}`flytekit:flytekit.task` documentation. +# # The following attributes can be specified for a `Resource`. # # 1. `cpu` @@ -93,43 +95,54 @@ def my_workflow(x: typing.List[int]) -> int: # %% import typing # noqa: E402 -from flytekit import Resources, task, workflow # noqa: E402 +from flytekit import Resources, task, workflow, dynamic # noqa: E402 +from flytekitplugins.kftensorflow import PS, Chief, TfJob, Worker # noqa: E402 # %% [markdown] # Define a task and configure the resources to be allocated to it. -# You can use tasks decorated with memory and storage hints like regular tasks in a workflow. +# You can use tasks decorated with memory and storage hints like regular tasks in a workflow, or configuration for an {py:class}`flytekitplugins:flytekitplugins.kftensorflow.TfJob` that can run distributed TensorFlow training on Kubernetes. # %% -@task(requests=Resources(cpu="1", mem="200Mi"), limits=Resources(cpu="2", mem="350Mi")) -def count_unique_numbers_1(x: typing.List[int]) -> int: - s = set() - for i in x: - s.add(i) - return len(s) - +@task( + task_config=TfJob( + num_workers=1, + num_ps_replicas=1, + num_chief_replicas=1, + ), + requests=Resources(cpu="1", mem="200Mi"), + limits=Resources(cpu="2", mem="350Mi"), +) +def run_tfjob() -> str: + + return "hello world" # %% [markdown] -# Define a task that computes the square of a number. +# The `with_overrides` method overrides the old resource allocations. # %% -@task -def square_1(x: int) -> int: - return x * x +@workflow +def my_run() -> str: + return run_tfjob().with_overrides(limits=Resources(cpu="6", mem="500Mi")) # %% [markdown] -# The `with_overrides` method overrides the old resource allocations. +# Or you can use `@dynamic` to generate tasks at runtime with any custom configurations you may want. # %% +@dynamic +def dynamic_run(num_workers: int) -> str: + return run_tfjob().with_overrides(task_config=TfJob( + num_workers=num_workers, + num_ps_replicas=1, + num_chief_replicas=1)) @workflow -def my_pipeline(x: typing.List[int]) -> int: - return square_1(x=count_unique_numbers_1(x=x)).with_overrides(limits=Resources(cpu="6", mem="500Mi")) - +def start_dynamic_run(new_num_workers: int) -> str: + return dynamic_run(num_workers=new_num_workers) # %% [markdown] # You can execute the workflow locally. # %% if __name__ == "__main__": - print(count_unique_numbers_1(x=[1, 1, 2])) - print(my_pipeline(x=[1, 1, 2])) + print(f"Running my_run(): {my_run()}") + print(f"Running dynamic_run(num_workers=4): {start_dynamic_run(new_num_workers=4)}") # %% [markdown] # You can see the memory allocation below. The memory limit is `500Mi` rather than `350Mi`, and the From a46a880efdaf29494e78a1ad9821ec9f61b24e47 Mon Sep 17 00:00:00 2001 From: jill Date: Sun, 15 Oct 2023 14:58:44 +0800 Subject: [PATCH 2/4] add additional documentation for the with_overrides feature Signed-off-by: jill --- .../productionizing/customizing_resources.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/productionizing/productionizing/customizing_resources.py b/examples/productionizing/productionizing/customizing_resources.py index 68404e0a8..63855112c 100644 --- a/examples/productionizing/productionizing/customizing_resources.py +++ b/examples/productionizing/productionizing/customizing_resources.py @@ -13,14 +13,13 @@ # Large datasets may not be able to run locally, so we would want to provide hints to the Flyte backend to request for more memory. # This is done by decorating the task with the hints as shown in the following code sample. # -# Tasks can have `task_config` which provides configuration for a specific task types, or `requests` and `limits` which mirror the native [equivalents in Kubernetes](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits). +# Tasks can have `requests` and `limits` which mirror the native [equivalents in Kubernetes](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits). # A task can possibly be allocated more resources than it requests, but never more than its limit. # Requests are treated as hints to schedule tasks on nodes with available resources, whereas limits # are hard constraints. # # For either a request or limit, refer to the {py:class}`flytekit:flytekit.Resources` documentation. # -# And for task_config, refer to the {py:func}`flytekit:flytekit.task` documentation. # # The following attributes can be specified for a `Resource`. # @@ -87,8 +86,10 @@ def my_workflow(x: typing.List[int]) -> int: # # ## Using `with_overrides` # -# You can use the `with_overrides` method to override the resources allocated to the tasks dynamically. -# Let's understand how the resources can be initialized with an example. +# Tasks can also have task_config which provides configuration for a specific task types. For task_config, refer to the {py:func}`flytekit:flytekit.task` documentation. +# +# You can use the `with_overrides` method to override the resources and task_config allocated to the tasks dynamically. +# Let's understand how the resources can be initialized and override with an example. # %% [markdown] # Import the dependencies. @@ -125,7 +126,7 @@ def my_run() -> str: # %% [markdown] -# Or you can use `@dynamic` to generate tasks at runtime with any custom configurations you may want. +# Or you can use `@dynamic` to generate tasks at runtime with any custom configurations you want. # %% @dynamic def dynamic_run(num_workers: int) -> str: From 25b9ad463d5f36a5be83e76c6239ec08072473b0 Mon Sep 17 00:00:00 2001 From: jill Date: Mon, 23 Oct 2023 17:38:10 +0800 Subject: [PATCH 3/4] add additional documentation for the with_overrides feature Signed-off-by: jill --- .../productionizing/customizing_resources.py | 182 ++++++++++++++---- 1 file changed, 148 insertions(+), 34 deletions(-) diff --git a/examples/productionizing/productionizing/customizing_resources.py b/examples/productionizing/productionizing/customizing_resources.py index 63855112c..315ca6011 100644 --- a/examples/productionizing/productionizing/customizing_resources.py +++ b/examples/productionizing/productionizing/customizing_resources.py @@ -20,7 +20,6 @@ # # For either a request or limit, refer to the {py:class}`flytekit:flytekit.Resources` documentation. # -# # The following attributes can be specified for a `Resource`. # # 1. `cpu` @@ -86,64 +85,52 @@ def my_workflow(x: typing.List[int]) -> int: # # ## Using `with_overrides` # -# Tasks can also have task_config which provides configuration for a specific task types. For task_config, refer to the {py:func}`flytekit:flytekit.task` documentation. -# -# You can use the `with_overrides` method to override the resources and task_config allocated to the tasks dynamically. -# Let's understand how the resources can be initialized and override with an example. +# ### override Resources +# You can use the `with_overrides` method to override the resources allocated to the tasks dynamically. +# Let's understand how the resources can be initialized with an example. # %% [markdown] # Import the dependencies. # %% import typing # noqa: E402 -from flytekit import Resources, task, workflow, dynamic # noqa: E402 -from flytekitplugins.kftensorflow import PS, Chief, TfJob, Worker # noqa: E402 +from flytekit import Resources, task, workflow # noqa: E402 # %% [markdown] # Define a task and configure the resources to be allocated to it. -# You can use tasks decorated with memory and storage hints like regular tasks in a workflow, or configuration for an {py:class}`flytekitplugins:flytekitplugins.kftensorflow.TfJob` that can run distributed TensorFlow training on Kubernetes. +# You can use tasks decorated with memory and storage hints like regular tasks in a workflow. # %% -@task( - task_config=TfJob( - num_workers=1, - num_ps_replicas=1, - num_chief_replicas=1, - ), - requests=Resources(cpu="1", mem="200Mi"), - limits=Resources(cpu="2", mem="350Mi"), -) -def run_tfjob() -> str: +@task(requests=Resources(cpu="1", mem="200Mi"), limits=Resources(cpu="2", mem="350Mi")) +def count_unique_numbers_1(x: typing.List[int]) -> int: + s = set() + for i in x: + s.add(i) + return len(s) - return "hello world" # %% [markdown] -# The `with_overrides` method overrides the old resource allocations. +# Define a task that computes the square of a number. # %% -@workflow -def my_run() -> str: - return run_tfjob().with_overrides(limits=Resources(cpu="6", mem="500Mi")) +@task +def square_1(x: int) -> int: + return x * x # %% [markdown] -# Or you can use `@dynamic` to generate tasks at runtime with any custom configurations you want. +# The `with_overrides` method overrides the old resource allocations. # %% -@dynamic -def dynamic_run(num_workers: int) -> str: - return run_tfjob().with_overrides(task_config=TfJob( - num_workers=num_workers, - num_ps_replicas=1, - num_chief_replicas=1)) @workflow -def start_dynamic_run(new_num_workers: int) -> str: - return dynamic_run(num_workers=new_num_workers) +def my_pipeline(x: typing.List[int]) -> int: + return square_1(x=count_unique_numbers_1(x=x)).with_overrides(limits=Resources(cpu="6", mem="500Mi")) + # %% [markdown] # You can execute the workflow locally. # %% if __name__ == "__main__": - print(f"Running my_run(): {my_run()}") - print(f"Running dynamic_run(num_workers=4): {start_dynamic_run(new_num_workers=4)}") + print(count_unique_numbers_1(x=[1, 1, 2])) + print(my_pipeline(x=[1, 1, 2])) # %% [markdown] # You can see the memory allocation below. The memory limit is `500Mi` rather than `350Mi`, and the @@ -156,3 +143,130 @@ def start_dynamic_run(new_num_workers: int) -> str: # Resource allocated using "with_overrides" method # ::: # +# ### override task_config +# Another example for using `with_overrides` method to override the `task_config`. +# In the following we take TF Trainning for example. +# Let’s understand how the TfJob can be initialized and override with an example. +# +# For task_config, refer to the {py:func}`flytekit:flytekit.task` documentation. +# +# Define some necessary functions and dependency. +# For more detail please check [here](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/kftensorflow_plugin/tf_mnist.html#run-distributed-tensorflow-training). +# In this content we focus on how to override the `task_conf`. +# %% +import os +from dataclasses import dataclass +from typing import NamedTuple, Tuple + +from dataclasses_json import dataclass_json +from flytekit import ImageSpec, Resources, dynamic, task, workflow +from flytekit.types.directory import FlyteDirectory + +custom_image = ImageSpec( + name="kftensorflow-flyte-plugin", + packages=["tensorflow", "tensorflow-datasets", "flytekitplugins-kftensorflow"], + registry="ghcr.io/flyteorg", +) + +if custom_image.is_container(): + import tensorflow as tf + from flytekitplugins.kftensorflow import PS, Chief, TfJob, Worker + +MODEL_FILE_PATH = "saved_model/" + + +@dataclass_json +@dataclass +class Hyperparameters(object): + # initialize a data class to store the hyperparameters. + batch_size_per_replica: int = 64 + buffer_size: int = 10000 + epochs: int = 10 + + +def load_data( + hyperparameters: Hyperparameters, +) -> Tuple[tf.data.Dataset, tf.data.Dataset, tf.distribute.Strategy]: + # Fetch train and evaluation datasets + ... + + +def get_compiled_model(strategy: tf.distribute.Strategy) -> tf.keras.Model: + # compile a model + ... + + +def decay(epoch: int): + # define a function for decaying the learning rate + ... + + +def train_model( + model: tf.keras.Model, + train_dataset: tf.data.Dataset, + hyperparameters: Hyperparameters, +) -> Tuple[tf.keras.Model, str]: + # define the train_model function + ... + + +def test_model(model: tf.keras.Model, checkpoint_dir: str, eval_dataset: tf.data.Dataset) -> Tuple[float, float]: + # define the test_model function to evaluate loss and accuracy on the test dataset + ... + + +# %% [markdown] +# To create a TensorFlow task, add {py:class}`flytekitplugins:flytekitplugins.kftensorflow.TfJob` config to the Flyte task, that is a plugin can run distributed TensorFlow training on Kubernetes. +# %% +training_outputs = NamedTuple("TrainingOutputs", accuracy=float, loss=float, model_state=FlyteDirectory) + +if os.getenv("SANDBOX") != "": + resources = Resources(gpu="0", mem="1000Mi", storage="500Mi", ephemeral_storage="500Mi") +else: + resources = Resources(gpu="2", mem="10Gi", storage="10Gi", ephemeral_storage="500Mi") + + +@task( + task_config=TfJob(worker=Worker(replicas=1), ps=PS(replicas=1), chief=Chief(replicas=1)), + retries=2, + cache=True, + cache_version="2.2", + requests=resources, + limits=resources, + container_image=custom_image, +) +def mnist_tensorflow_job(hyperparameters: Hyperparameters) -> training_outputs: + train_dataset, eval_dataset, strategy = load_data(hyperparameters=hyperparameters) + model = get_compiled_model(strategy=strategy) + model, checkpoint_dir = train_model(model=model, train_dataset=train_dataset, hyperparameters=hyperparameters) + eval_loss, eval_accuracy = test_model(model=model, checkpoint_dir=checkpoint_dir, eval_dataset=eval_dataset) + return training_outputs(accuracy=eval_accuracy, loss=eval_loss, model_state=MODEL_FILE_PATH) + + +# %% [markdown] +# You can use `@dynamic` to generate tasks at runtime with any custom configurations you want, and `with_overrides` method overrides the old configuration allocations. +# For here we override the worker replica count. +# %% +@workflow +def mnist_tensorflow_workflow( + hyperparameters: Hyperparameters = Hyperparameters(batch_size_per_replica=64), +) -> training_outputs: + return mnist_tensorflow_job(hyperparameters=hyperparameters) + + +@dynamic +def dynamic_run( + new_worker: int, + hyperparameters: Hyperparameters = Hyperparameters(batch_size_per_replica=64), +) -> training_outputs: + return mnist_tensorflow_job(hyperparameters=hyperparameters).with_overrides( + task_config=TfJob(worker=Worker(replicas=new_worker), ps=PS(replicas=1), chief=Chief(replicas=1)) + ) + + +# %% [markdown] +# You can execute the workflow locally. +# %% +if __name__ == "__main__": + print(mnist_tensorflow_workflow()) + print(dynamic_run(new_worker=4)) From 0806462aebc92fc5bc50868ff8e7219741416702 Mon Sep 17 00:00:00 2001 From: jill Date: Fri, 3 Nov 2023 00:22:45 +0800 Subject: [PATCH 4/4] add additional documentation for the with_overrides feature Signed-off-by: jill --- .../productionizing/customizing_resources.py | 138 ++++++------------ 1 file changed, 45 insertions(+), 93 deletions(-) diff --git a/examples/productionizing/productionizing/customizing_resources.py b/examples/productionizing/productionizing/customizing_resources.py index 315ca6011..28be48118 100644 --- a/examples/productionizing/productionizing/customizing_resources.py +++ b/examples/productionizing/productionizing/customizing_resources.py @@ -132,6 +132,10 @@ def my_pipeline(x: typing.List[int]) -> int: print(count_unique_numbers_1(x=[1, 1, 2])) print(my_pipeline(x=[1, 1, 2])) +from typing import NamedTuple + +import tensorflow as tf + # %% [markdown] # You can see the memory allocation below. The memory limit is `500Mi` rather than `350Mi`, and the # CPU limit is 4, whereas it should have been 6 as specified using `with_overrides`. @@ -150,97 +154,50 @@ def my_pipeline(x: typing.List[int]) -> int: # # For task_config, refer to the {py:func}`flytekit:flytekit.task` documentation. # -# Define some necessary functions and dependency. -# For more detail please check [here](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/kftensorflow_plugin/tf_mnist.html#run-distributed-tensorflow-training). -# In this content we focus on how to override the `task_conf`. -# %% -import os -from dataclasses import dataclass -from typing import NamedTuple, Tuple - -from dataclasses_json import dataclass_json -from flytekit import ImageSpec, Resources, dynamic, task, workflow -from flytekit.types.directory import FlyteDirectory - -custom_image = ImageSpec( - name="kftensorflow-flyte-plugin", - packages=["tensorflow", "tensorflow-datasets", "flytekitplugins-kftensorflow"], - registry="ghcr.io/flyteorg", -) - -if custom_image.is_container(): - import tensorflow as tf - from flytekitplugins.kftensorflow import PS, Chief, TfJob, Worker - -MODEL_FILE_PATH = "saved_model/" - - -@dataclass_json -@dataclass -class Hyperparameters(object): - # initialize a data class to store the hyperparameters. - batch_size_per_replica: int = 64 - buffer_size: int = 10000 - epochs: int = 10 - - -def load_data( - hyperparameters: Hyperparameters, -) -> Tuple[tf.data.Dataset, tf.data.Dataset, tf.distribute.Strategy]: - # Fetch train and evaluation datasets - ... - - -def get_compiled_model(strategy: tf.distribute.Strategy) -> tf.keras.Model: - # compile a model - ... - - -def decay(epoch: int): - # define a function for decaying the learning rate - ... - - -def train_model( - model: tf.keras.Model, - train_dataset: tf.data.Dataset, - hyperparameters: Hyperparameters, -) -> Tuple[tf.keras.Model, str]: - # define the train_model function - ... - - -def test_model(model: tf.keras.Model, checkpoint_dir: str, eval_dataset: tf.data.Dataset) -> Tuple[float, float]: - # define the test_model function to evaluate loss and accuracy on the test dataset - ... - - -# %% [markdown] # To create a TensorFlow task, add {py:class}`flytekitplugins:flytekitplugins.kftensorflow.TfJob` config to the Flyte task, that is a plugin can run distributed TensorFlow training on Kubernetes. # %% -training_outputs = NamedTuple("TrainingOutputs", accuracy=float, loss=float, model_state=FlyteDirectory) - -if os.getenv("SANDBOX") != "": - resources = Resources(gpu="0", mem="1000Mi", storage="500Mi", ephemeral_storage="500Mi") -else: - resources = Resources(gpu="2", mem="10Gi", storage="10Gi", ephemeral_storage="500Mi") +from flytekit import Resources, dynamic, task, workflow +from flytekitplugins.kftensorflow import PS, Chief, TfJob, Worker + +TrainingOutputs = NamedTuple( + "TrainingOutputs", + [ + ("model", tf.keras.Model), + ("accuracy", float), + ("loss", float), + ], +) @task( task_config=TfJob(worker=Worker(replicas=1), ps=PS(replicas=1), chief=Chief(replicas=1)), - retries=2, + cache_version="1.0", cache=True, - cache_version="2.2", - requests=resources, - limits=resources, - container_image=custom_image, + requests=Resources(cpu="1", mem="2048Mi"), + limits=Resources(cpu="1", mem="2048Mi"), ) -def mnist_tensorflow_job(hyperparameters: Hyperparameters) -> training_outputs: - train_dataset, eval_dataset, strategy = load_data(hyperparameters=hyperparameters) - model = get_compiled_model(strategy=strategy) - model, checkpoint_dir = train_model(model=model, train_dataset=train_dataset, hyperparameters=hyperparameters) - eval_loss, eval_accuracy = test_model(model=model, checkpoint_dir=checkpoint_dir, eval_dataset=eval_dataset) - return training_outputs(accuracy=eval_accuracy, loss=eval_loss, model_state=MODEL_FILE_PATH) +def train_model() -> TrainingOutputs: + (X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data() + X_train, X_test = X_train / 255.0, X_test / 255.0 + strategy = tf.distribute.MirroredStrategy() + with strategy.scope(): + model = tf.keras.models.Sequential( + [ + tf.keras.layers.Flatten(input_shape=(28, 28)), + tf.keras.layers.Dense(128, activation="relu"), + tf.keras.layers.Dropout(0.2), + tf.keras.layers.Dense(10), + ] + ) + model.compile( + optimizer="adam", loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=["accuracy"] + ) + BATCH_SIZE = 64 + NUM_EPOCHS = 5 + model.fit(X_train, y_train, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE) + test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=2) + + return TrainingOutputs(model=model, accuracy=test_accuracy, loss=test_loss) # %% [markdown] @@ -248,18 +205,13 @@ def mnist_tensorflow_job(hyperparameters: Hyperparameters) -> training_outputs: # For here we override the worker replica count. # %% @workflow -def mnist_tensorflow_workflow( - hyperparameters: Hyperparameters = Hyperparameters(batch_size_per_replica=64), -) -> training_outputs: - return mnist_tensorflow_job(hyperparameters=hyperparameters) +def my_tensorflow_workflow() -> TrainingOutputs: + return train_model() @dynamic -def dynamic_run( - new_worker: int, - hyperparameters: Hyperparameters = Hyperparameters(batch_size_per_replica=64), -) -> training_outputs: - return mnist_tensorflow_job(hyperparameters=hyperparameters).with_overrides( +def dynamic_run(new_worker: int) -> TrainingOutputs: + return train_model().with_overrides( task_config=TfJob(worker=Worker(replicas=new_worker), ps=PS(replicas=1), chief=Chief(replicas=1)) ) @@ -268,5 +220,5 @@ def dynamic_run( # You can execute the workflow locally. # %% if __name__ == "__main__": - print(mnist_tensorflow_workflow()) + print(my_tensorflow_workflow()) print(dynamic_run(new_worker=4))