Synopsis: In this course, you will be learning from ML Engineers and Trainers who work with the state-of-the-art development of ML pipelines here at Google Cloud. The first few modules will cover about TensorFlow Extended (or TFX), which is Google’s production machine learning platform based on TensorFlow for management of ML pipelines and metadata. You will learn about pipeline components and pipeline orchestration with TFX. You will also learn how you can automate your pipeline through continuous integration and continuous deployment, and how to manage ML metadata. Then we will change focus to discuss how we can automate and reuse ML pipelines across multiple ML frameworks such as tensorflow, pytorch, scikit learn, and xgboost. You will also learn how to use another tool on Google Cloud, Cloud Composer, to orchestrate your continuous training pipelines. And finally, we will go over how to use MLflow for managing the complete machine learning life cycle.
TFX Components
TensorFlow Extended is Google's production machine learning platform based on TensorFlow. It provides a flexible configuration framework and shared libraries to integrate common machine learning tests implemented as components needed to define, launch, and monitor your machine learning system. TFX makes ML ops easier through all phases of the ML project lifecycle from prototyping to production.
It is designed to orchestrate your machine learning workflow with portability to multiple environments and orchestration frameworks in mind. This includes Apache Airflow, Apache Beam and Coop Flow.
A possible mapping of different components to GCP Services.
A TFX component is an implementation of the machine learning task in your pipeline. They are designed to be modular and extensible, while incorporating Google's machine learning best practices on tasks such as data partitioning, validation and transformation.
Each step of your TFX pipeline, again called a component, produces and consumes structured data representations called artifacts. Subsequent components in your workflow may use these artifacts as inputs. In this way, TFX lets you transfer data between components during continuous execution of your pipeline.
Components are composed of five elements:
Component instances produce artifacts as outputs, and typically depend on artifacts produced by upstream component instances, as inputs, for example of transform data is an artifact. It depends on the training data artifact ingested into your pipeline, and serves as input to your model during model training.
First, a driver reads the component specification for runtime parameters, and retrieve the required artifacts from the ML metadata store for the component. Second, and executer performs the actual computation on the retrieved input artifacts in generates the output artifacts. Finally, the publisher reads the component specification to log the pipeline component run and ML metadata and write the components output artifacts to the artifact store.
TFX pipelines are a sequence of components linked together by a directed acyclical graph of the relationships between artifact dependencies. They communicate through input and output channels. Orchestrators ensure consistency with pipeline execution order, component logging, retries and failure recovery, and intelligent parallelization of component data processing
The ML metadata library stores the metadata in a relational back end. For notebook prototypes, this can be local SQL lite database and for production cloud deployments, this could be a managed MySQL or Postgres database. ML metadata does not store the actual pipeline artifacts. TFX automatically organizes and stores the artifacts on local file systems on a remote cloud storage file system for a consistent organization across your machine learning projects.
For most machine learning cases with TFX, you will only interact with the integrated front end layer and don't need to engage directly with the orchestrator, shared libraries, and ML metadata unless you need additional customization.
TFX standard components come pre-packaged with TensorFlow, and are designed to help improve your pipeline development velocity. You are able to mix together standard and custom component to suit your machine learning workflow needs. Each standard component is designed around common machine learning tasks and encodes Google's ML best practices around tasks such as data monitoring and validation, training and serving data transformations, model evaluation, and serving.
The ExampleGen
TFX pipeline component is the entry point to your pipeline that adjusts data. As inputs, ExampleGen
supports out-of-the-box ingestion of external data sources such as CSV, TF Records, Avro, and Parquet. As outputs, ExampleGen
produces TF examples or TF sequence examples, which are highly efficient in performant data set representations that can be read consistently by downstream components. This comes with directory management and logging by default to ensure ML best practices on consistent project set up.
ExampleGen
supports advanced data management capabilities such as data partitioning, versioning, and custom splitting on features or time.
StatisticsGen
performs a complete pass over your data using Apache Beam and the TensorFlow Data Validation library to calculate summary statistics for each of your features over your configured train, dev, and test splits. This includes statistics like mean, standard deviation, quantile ranges, and the prevalence of null values.
As inputs, the StatisticsGen
component is configured to take TF examples from the ExampleGen
component. As outputs, StatisticsGen
produces a data set statistics artifact that contains feature statistics used for downstream components.
Some TFX components use a description of your input data called a schema that can be automatically generated by the SchemaGen
component. As inputs, SchemaGen
reads the StatisticsGen
artifact
to infer characteristics of your input data from the observed feature data distributions. As outputs, SchemaGen
produces a schema artifact, a description of your data's characteristics.
The schema can specify data types for feature values, whether a feature has to be present in all examples, what are the allowed value ranges for each feature, and other properties. The SchemaGen is an optional component that does not need to be run on every pipeline run.
The ExampleValidator
pipeline component identifies anomalies in train, dev, and test data. As inputs, ExampleValidator
reads feature statistics from StatisticsGen
and the schema artifact produced by the SchemaGen
component or imported from an external source.
As outputs, ExampleValidator
outputs an anomalies report artifact. The ExampleValidator
pipeline component identifies any anomalies in the example data by comparing data statistics computed by the StatisticsGen
pipeline component against a schema.
The Transform
TFX pipeline component performs feature engineering on the TF examples data artifact emitted from the ExampleGen component using the data schema artifact from SchemaGen or imported from external sources as well as TensorFlow transformations typically defined in a pre-processing function.
As outputs, the Transform component emits a SavedModel artifact that encapsulates feature engineering logic. When executed, the SavedModel will accept TF examples emitted from an ExampleGen component, and emit the Transform feature data.
Standard TFX components for Model Management.
The Tuner
component is the newest TFX component. It makes extensive use of the Python Keras tuner API for tuning hyper parameters. As inputs, the tuner component takes in the transform data
and transform graph artifacts. As outputs the tuner components output a hyper parameter artifact. You can modify the trainer configurations to directly ingest the best hyperparameters found from the most recent tuner run.
Now that you've trained your model in a TFX pipeline, how do you know how well it performed? Here comes the Evaluator
component for model performance evaluation. As inputs, the evaluator component will use the model created by the trainer in the original input data artifact. It will perform a thorough analysis using the TensorFlow Model Analysis library to compute machine learning metrics across data splits and slices.
As outputs, the Evaluator
component produces two artifacts, an evaluation metrics artifact that contains configurable model performance metrics slices, and a "model blessing" artifact that indicates whether the models performance was higher than the configured thresholds and that it is ready for production.
Next up is InfraValidator
, which is a TFX component that is used as an early warning layer before pushing a model to production. The name InfraValidator came
from the fact that it is validating the model in the actual model serving infrastructure. If evaluator guarantees that performance of the model, InfraValidator
guarantees that the model is mechanically fine, and it prevents bad models from being pushed to production.
As inputs, InfraValidator
takes the SavedModel
artifact from the trainer component, launches a sandboxed model server with the model, and tests whether it can be successfully loaded and optionally queried using the input data artifact from the example Gen component. The resulting output InfraValidation
artifact will be generated in the blessed output in the same way that evaluator does.
InfraValidator brings standardization to this model infra check and is configurable to mirror model-serving environments such as Kubernetes clusters and TF Serving
The Pusher
component is used to push a validated model to a deployment target during model training or retraining. Before the deployment, pusher relies on one or more blessings from other validation components as input to decide whether to push the model.
Evaluator
blesses the model if the new trained model is good enough to be pushed to production.InfraValidator
blesses the model if the model is mechanically serviceable in a production environment The pusher component brings the benefits of a production gatekeeper to your TFX pipeline to ensure that only the best performing models that are mechanically sound make it to production.
A BulkInferer
TFX component can be used to perform batch inference on unlabeled TF examples. It is typically deployed after an evaluator component to perform inference with a validated model, or after the trainer component to directly perform inference on an exported model. It currently performs in memory model inference and remote inference. Remote inference requires the model to be hosted on Cloud AI platform.
As inputs, BulkInferer
reads from the following artifacts, a train TensorFlow SavedModel from the trainer component, optionally a model blessing artifact from the evaluator component, input data TF example artifacts from the ExampleGen
component.
Pipeline nodes are special purpose classes for performing advanced metadata operation such as importing external artifacts into ML metadata, performing queries of current ML metadata based on artifact properties and their history.
The most common pipeline node is the ImporterNode
, which is a special node that registers an external resource into the ML metadata library So downstream nodes can use the registered artifact as input. The primary use case for this node is to bring in external artifacts like a schema into the TFX pipeline for use by the transform and trainer components.
The next type of pipeline node is a ResolverNode
. ResolverNode
is a special TFX node that handles special artifact resolution logistics that will be used as inputs for downstream nodes. The model resolver is only required if you're performing model validation in addition to evaluation.
The diagram on the slide illustrates the relationships between TFX libraries and pipeline component
Useful links:
In this lab, you will work with the Covertype Data Set and use TFX to analyze, understand, and pre-process the dataset and train, analyze, validate, and deploy a multi-class classification model to predict the type of forest cover from cartographic features.
# Set the project id to your Google Cloud Project
PROJECT_ID=Project ID
gcloud config set project $PROJECT_ID
# enable the required cloud services:
gcloud services enable \
cloudbuild.googleapis.com \
container.googleapis.com \
cloudresourcemanager.googleapis.com \
iam.googleapis.com \
containerregistry.googleapis.com \
containeranalysis.googleapis.com \
ml.googleapis.com \
dataflow.googleapis.com
# add the Editor permission for your Cloud Build service account:
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")
CLOUD_BUILD_SERVICE_ACCOUNT="${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member serviceAccount:$CLOUD_BUILD_SERVICE_ACCOUNT \
--role roles/editor
# create a custom service account to give CAIP training job access to AI Platform Vizier service for pipeline hyperparameter tuning:
SERVICE_ACCOUNT_ID=tfx-tuner-caip-service-account
gcloud iam service-accounts create $SERVICE_ACCOUNT_ID \
--description="A custom service account for CAIP training job to access AI Platform Vizier service for pipeline hyperparameter tuning." \
--display-name="TFX Tuner CAIP Vizier"
# Grant your AI Platform service account additional access permissions to the AI Platform Vizier service for pipeline hyperparameter tuning
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")
CAIP_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@cloud-ml.google.com.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member serviceAccount:$CAIP_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member serviceAccount:$CAIP_SERVICE_ACCOUNT \
--role=roles/ml.admin
# 6. Grant service account access to Storage admin role:
SERVICE_ACCOUNT_ID=tfx-tuner-caip-service-account
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${SERVICE_ACCOUNT_ID}@${PROJECT_ID}.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
# 7. Grant service acount access to AI Platform Vizier role:
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${SERVICE_ACCOUNT_ID}@${PROJECT_ID}.iam.gserviceaccount.com \
--role=roles/ml.admin
# Grant your project's AI Platform Google-managed service account the Service Account Admin role for your AI Platform service account:
gcloud iam service-accounts add-iam-policy-binding \
--role=roles/iam.serviceAccountAdmin \
--member=serviceAccount:service-${PROJECT_NUMBER}@cloud-ml.google.com.iam.gserviceaccount.com \
${SERVICE_ACCOUNT_ID}@${PROJECT_ID}.iam.gserviceaccount.com
AI Platform > Pipelines > New Instance > Configure > Create New Cluster > (after creation) > Deploy
This creates a Kubeflow Pipelines cluster
AI Platform > Workbench > Notebook > TF Enterprise w/o GPU
git clone https://github.com/GoogleCloudPlatform/mlops-on-gcp
cd mlops-on-gcp/workshops/tfx-caip-tf23
./install.sh
Open lab-01.ipynb
Follow along in the notebook. Note: This lab is broken at the moment on Qwicklab
We'll see:
- how to containerize training applications built with other popular machine learning frameworks, such as PyTorch, scikit-learn, and XGBoost.
- Then, you'll learn how to deploy these containers as ops in a Kubeflow pipeline.
- You'll also see that you can train TensorFlow, PyTorch, scikit, and XGBoost models in parallel inside of a Kubeflow pipeline.
So what's the process of containerizing a TensorFlow training application? 3 step process:
- Step 1: First, we simply create a training script, and this script contains the code that actually ingests data, trains the model, and then saves the model.
- Step 2: The next step is to package the training script into a Docker image. This is where we define the dependencies of our training script
- Step 3: The third step is to actually build and push this image to the Container Registry. And as you know, we can do this with the
gcloud build submit --tag $IMG_URI $IMG_NAME
command.
What if we have a different framework? like scikit, pytorch, xgboost?
How to use multiple models from multiple different frameworks in a Kubeflow pipeline? For ex, we may load train and eval data from BQ and then train 4 models, each from a different framework as shown in the following pipeline:
In that case: We just have to define a list for each op that contains the training arguments corresponding to each specific training op. Just as we define separate lists for each training op, we also have to define separate training ops themselves that take the corresponding lists as arguments
Continuous training simply refers to retraining models periodically at set intervals or with specific triggers. In this discussion, we'll focus on retraining periodically or with set intervals. Model performance tends to deteriorate over time. AI platform pipelines makes it incredibly easy to schedule pipeline runs. When setting up a run in the UI, we can either choose a one-off run, which just simply runs the pipeline once, or a recurring run, where we can easily schedule periodic or set interval runs of our pipeline.
Lab: Continuous Training with TensorFlow, PyTorch, XGBoost, and Scikit Learn Models with Kubeflow and Vertex AI Pipelines
AI Platform > Pipelines > New Instance > Configure Kubeflow > Create New Cluster > Deploy
Note: I downloaded the completed notebook and saved it in notebooks/ml-pipelines-on-gcp/cts-train-with-multiple-frmwks-with-kubeflow-and-ai-platform-pipelines.ipynb
This is what a successful run looks like:
In this section we learned how to do manual pipeline runs, in the next section we will see an automated workflow orchestration service i.e. Cloud Composer / Airflow.
Airflow serves similar role as Prefect (there's one more software, but i don't recall the name right now) How does Composer compare to Vertex AI Pipelines, Kubeflow Pipelines, Dataflow etc? When would one use one over the other? Can they be used together?
blogpost: Kubeflow Pipelines vs. Cloud Composer for Orchestration
DAG: A DAG is a collection of the tasks you want to run, represented by the nodes of the graph, organized in such a way that reflects the relationships and dependencies, represented by the edges of the graph. In Airflow, we use a Python SDK to define the DAGs, the task and dependencies, as code. Every DAG has:
- a definition,
- operators,
- and definitions of the operator relationships
DAG Run: A DAG run is a physical instance of that DAG, containing task instances that run for a specific execution date. The execution date is the logical date and time that the DAG run and its task instances are running for. The Airflow scheduler, which is managed by Composer within a Kubernetes pod, will often create the DAG runs. However, they can also be created by external triggers. DAG can have multiple runs, even concurrently, each with different execution dates.
Tasks: Tasks define a unit of work in your workflow. What are these tasks? They are parameterized implementations of operators that we will discuss in the next section
Operators: are usually, but not always, atomic. This means that they can work alone and they don't need to share resources with any other operators. The DAG will make sure that operators run in the correct order. Other than those dependencies, operators do generally run independently. In Airflow you can leverage XComms if you do need to pass information between different operators. Airflow supports three main types of operators:
- operators that perform an action or tell another system to perform an action
- transfer operators that move data from one system to another, for example, BigQuery to Cloud storage
- sensors that will keep running until a specific criterion is met, such as a file sensor that waits until a file is present at a certain location before triggering any downstream task.
Some concrete examples are DatflowOp, DataprocOp, BigQueryOp etc.
Airflow DAG is defined in a Python script. The scripts have five main sections
And any code outside of the DAG definition will then be run every second, so keep that in mind.
Setting Op dependency / order:
Creating Cloud Composer Environmnet: Now that we have a DAG ready to go, how do we create and access our environment to run it? We can access the composer environment via the Google Cloud Console, Google Cloud SDK CLI, or via rest APIs.
Now that we know how to create our DAGs using the Airflow Python SDK Creator Composer environment and access the Airflow web server. Let's focus on what we need to work on for building continuous training pipelines in Airflow and running them on Cloud Composer.
After the data is prepared, we're ready to start training our model. If we're using a TensorFlow, XGboost, or scikit-learn model, we can train directly on AI platform
How we can use Airflow and Cloud Composer to orchestrate container based workloads and TFX pipelines?
01. For container workloads: If we have tasks with non-PyPI dependencies, or if the tasks are already containerized, we can run the containers as Airflow task. 02. For TFX Pipelines: If you have already written your ML pipeline using TFX, then you don't need to do much. TFX includes an Airflow DAG runner for running your TFX pipelines via Airflow.
In this lab you will learn how to write an Airflow DAG for continuous training and deploy the DAG within a Cloud Composer environment. You will also learn how to explore and monitor your DAG runs using the Apache Airflow webserver.
Enable ML services,
gcloud services enable ml.googleapis.com
create buckets,
export BUCKET_NAME=${DEVSHELL_PROJECT_ID}
export REGION=us-central1
export ZONE=us-central1-a
gsutil mb -l ${REGION} gs://${BUCKET_NAME}
PubSub topics, (why?)
gcloud pubsub topics create chicago-taxi-pipeline
Create a BigQuery dataset for storing preprocessed data for training and a table for storing training metrics
bq mk -d chicago_taxi_ct
bq mk --table chicago_taxi_ct.model_metrics version_name:STRING,rmse:FLOAT
and Cloud Composer environment
gcloud composer environments create demo-environment \
--location $REGION \
--zone $ZONE \
--python-version 3 \
--image-version composer-1.20.8-airflow-1.10.15
We will be working through the Python script defining our continuous training DAG in Airflow. The Apache Airflow Concepts and Google Cloud Operators documentation may be helpful in the following exercises.
TODO 1: Define basic DAG params e.g. name and args
TODO 2: Define SQL queries for extracting the training and validation datasets
This uses BigQuery. We read data and store it in relevant BQ tables. We are creating two Ops, one for train and the other for test.
TODO 3: use the python callable set_new_version_name
to set an Airflow Variable
TODO 4: Using the query (model_check_sql
) , finish defining the task bq_check_rmse_query_op
to ensure that the model meets our threshold of an RMSE of 10.0 by defining the pass_value
argument appropriately. Set the task_id
to bq_value_check_rmse_task
TODO 5: Correct ordering. The bq_check_data_op
should have four downstream tasks: publish_if_failed_check_op
, python_new_version_name_op
, bq_train_data_op
, bq_valid_data_op
. Define these dependencies to finish the Python script.
In this task you will copy your newly completed DAG into a Cloud Storage bucket, which will be automatically synced with your Cloud Composer environment. Afterwards you will check that your DAG was loaded correctly and start a DAG run.
Check if env is ready: Navigation > Big Data > Composer
Define Airflow vars:
MLflow is composed of four components: tracking, projects, models, and registry.
For reproducibility, MLflow enables users to log the particular source code that was used to produce a model, along with this version by integrating tightly with Git, to map every model to a particular commit hash.
MLflow provides support for flexible text and notes associated with the training session. For example, notes might be a good place to drop some information about the business use case that our model is being developed for.
The MLflow tracking service back end is divided into two components.
- Entity or Metadata store
- Artifact store
MLflow's solution to the reproducibility challenge is a self-contained Project Specification, that bundles all of the machine learning training code with this version library dependencies, its configurations, and its training and test data.
MLflow models is the general purpose model format that supports different production environments. It can be used to load and deploy models produced in a variety of different ways and written in varioud ML frameworks.
Similar to a project, an MLflow model is also a directory structure. It contains a configuration file and instead of containing training goals, this time it contains a serialized model artifact. It also contains (like a project does) a set of dependencies for reproducibility. This time, we are talking about evaluation dependencies in the form of a Conda environment. Additionally, MLflow provides model creation utilities for serializing models from a variety of popular frameworks in MLflow format. Finally, MLflow introduces deployments, APIs for produce theorizing and deploying any MLflow model to a variety of services.
We can load a model via different 'flavors':
MLflow Model Registry is a centralized model store and a UI and set of APIs that enables you to manage the full lifecycle of MLflow models.
As a developer, you log your model into the registry (it works with any type of model supported by mlflow). And other people now can go to this model and either manually review this model or plug in automated tools and use the API. Then, downstream users can, in a safe way, pull the latest model after it has been reviewed and confirmed to work.