Patcharanat P.
If you use or adapt this project, please credit and link to the original. Thank you!
Click "⋮≡" at top right to show the table of contents.
End-to-end Data project in the e-commerce and retail industries covering the full process of data exploitation, including Data Engineering, Data Science, Data Analytic, and how to automate ML deployment (MLOps).
It's crucial in nowadays to emphasize data existing and make the most use of it. The project was created to practice and demonstrate the full process of data exploitation covering setting up local environments, cloud infrastructure, ETL pipelines, Data Visualization, Machine Learning Model Development, and Model Deployment using E-commerce data.
(latest revised: July 2023, Aug 2023, Oct 2024, 14 Nov 2024, 25 Nov 2024)
- Setting up Local Environment
- Setting up Cloud Infrastructure and Authentication
- DAGs and ETL Pipelines
- EDA and Data Visualization
- 4.1 EDA
- 4.2 PowerBI Dashboard
- Machine Learning Model Development
- ML Code Productionization and Deployment
- Conclusion
- Appendix
Dataset: E-Commerce Data - Kaggle
- Python
- Docker Desktop
- Terraform (+Add to
$PATH
) - Available GCP account to enable google cloud services.
- gcloud (GCP SDK)
- Credential File for AWS as
.csv
- Credential file from kaggle with activated token for API.
The credentials are hidden from repo in by .gitignore
Clone this repository to obtain all neccessary files, then use it as root working directory.
git clone https://github.com/patcha-ranat/Ecommerce-Invoice-End-to-end.git
Create all hidden files from repo in order to reproduce:
credentials/<aws-credentials>.csv
credentials/kaggle.json
src/config/variables.json
, please change as needed referred to variables-masked.json.env
using this template# GCP GOOGLE_CLOUD_PROJECT=<your-gcp-project> GCP_REGION=<your-gcp-region> # AWS AWS_CREDENTIALS_FILENAME=<aws-credentials-filename>.csv AWS_BUCKET=<your-aws-bucket-name>
terraform/terraform.tfvars
# aws credentials aws_access_key = "XXX" aws_secret_key = "XXX" # serverless redshift ########################## # Application Definition # ########################## # app_name = "XXX" # Do NOT enter any spaces app_environment = "dev" # Dev, Test, Staging, Prod, etc ######################### # Network Configuration # ######################### redshift_serverless_vpc_cidr = "10.0.0.0/16" redshift_serverless_subnet_1_cidr = "10.0.1.0/24" redshift_serverless_subnet_2_cidr = "10.0.2.0/24" redshift_serverless_subnet_3_cidr = "10.0.3.0/24" ################################### ## Redshift Serverless Variables ## ################################### # redshift_serverless_namespace_name = "xxx-namespace" # redshift_serverless_database_name = "xxxdb" # redshift_serverless_admin_username = "XXX" # redshift_serverless_admin_password = "XXX" # redshift_serverless_workgroup_name = "XXX-workgroup" # redshift_serverless_base_capacity = 32 # redshift_serverless_publicly_accessible = false
code/models/terraform/terraform.tfvars
project_id = "<GCP-Project-ID>" service_account_email = "<Service-Account-Email>"
code/models/terraform/auth/terraform.tfvars
project_id = "<GCP-Project-ID>" service_account_email = "<Service-Account-Email>"
Run the following commands:
# workdir: .
# docker compose build
# docker compose up
docker compose up --build
# check localhost:8080 via web browser
# Ctrl+C
# docker compose down -v
# delete all local docker images via Docker UI
Since the pipelines might be needed to be run in different environment not just on local machine, it's essential to make sure that your code can be packaged and run in anywhere or be able to orchestrated by different tools such as Kubernetes for further scaling up. Developing and testing your code to perform ETL/ELT processes in a Docker container is essential. And with Docker Compose, it make it more convenience to orchestrate multiple containers, in which most of time are built by different images, with a lightweight approach.
Usually, we don't use docker compose in production, but it is lightweight and easy enough to allow you to run and test your code on local machine. However, packaging your code as a docker images is still the way to go for production scale pipeline, specifically for Kubernetes as an example.
Warnings: For this Repo. Don't deploy your images created from docker-compose to public
From "Setting Up Local Environment Overview" image,
- Docker compose will copy raw data data/uncleaned.csv, and setup.sql to source postgres database container to mockup data source as DBMS. With the
setup.sql
as an entrypoint of the container, source postgres db will initiate table and load data into in. All the execution steps defined in postgres.Dockerfile - Docker compose will copy target.sql to target postgres database container to mockup target database as DBMS. With the target.sql as an entrypoint of the container, target postgres db will initiate empty table waiting for ETL processes. All the execution steps defined in postgres-target.Dockerfile
- Docker compose will do the following:
- Mount Volume of credentials (GCP-ADC, AWS, and Kaggle) to airflow scheduler and webserver containers. And it also mount necessary components for airflow such as
logs
folder, src/config/variables.json, and src/config/connections.json which will be clarified more on Topic 2: Setting up Cloud Infrastructure and Authentication - Mount airflow-entrypoint.sh to airflow scheduler and webserver containers and use it as entrypoint to import mounted variables.json and in runtime at first start. The step that related to authentication also will be explained in Topic 2.
- Use airflow.properties to configure Airflow Containers' environment variables (act as
.env
file in airflow containers) - install airflow python dependencies specified in requirements.txt executed by airflow.Dockerfile. Setting up gcloud also will be explained in Topic 2.
- Mount Volume of credentials (GCP-ADC, AWS, and Kaggle) to airflow scheduler and webserver containers. And it also mount necessary components for airflow such as
- Docker Compose file docker-compose.yml orchestrated all the processes above. It can indicate how images should be built by specifying different
Dockerfile
(s) inbuild
section andcontext
section that's relative todocker-compose.yml
file. - To execute entrypoint commands, you can use
entrypoint
section indocker-compose.yml
file using shell script. However, it's different for.sql
script. To execute.sql
script as entrypoint in postgres database containers, you have to copy (or mount) the script to/docker-entrypoint-initdb.d
directory, and it will execute automatically while the container spinning up (initializing docker session). - You may notice that in
docker-compose.yml
file using${VARIABLE_NAME}
that referred to variables specified in.env
file. The.env
file export environment variables (env var) specified in the file for local machine during docker compose runtime, operating separately fromairflow.properties
which export env var in airflow containers during runtime.
In airflow official site, you will find docker-compose.yml
template to run airflow you can use it as reference and change it to fit your needs, like adding postgres section, and remove unnecessary part that can causes running out of memory making you unable to run docker containers successfully.
Note: some services need time to start, check container's logs from docker desktop UI or docker ps
to see if the services are ready to work with.
What's needed to be checked are
-
Are the files are mounted/copied to target path correctly?
kaggle.json
ecomm-invoice-kde-aws-iam_accessKeys.csv
setup.sql
cleaned_data.csv
target.sql
-
Is the data table in postgres database as a source created correctly?
- data loaded from
cleaned_data.csv
and using the right schema?
- data loaded from
-
Is the data table in postgres database as a target created correctly?
- empty table with the right schema?
Getting into terminal of the container we specified by:
docker exec -it <container-name-or-id> bash
Note: You can get container's name or id from docker-compose.yml
or from docker ps
command.
Access to both postgres containers, and then access database to check if csv file copied into table.
# docker exec -it postgres-container-source bash
psql -U postgres -d mydatabase
# to see tables list
\dt or \d
# to see schemas list
\dn or \z
if we see table and schema are corrected and shown, then we successfully importing csv to the Postgres database.
if not, these can be issues
- check if
setup.sql
is executed successfully, by inspecting logs in docker desktop - check if data csv file and
setup.sql
are copied into docker container's local by using container's bash and check if path inDockerfile
andsetup.sql
were set correctly. - we need to set search_path by
# to set only in current session. (reccomended)
SET search_path TO <myschema>;
# to set permanently at database level
ALTER DATABASE <mydatabase> SET search_path TO <myschema>;
In postgres bash, we will be able to see only the table that match the schema we created. Hence, we have to change the schema to see the table in the database.
Then exit from all bash
\q
exit
Note: Specifying image name or container name in docker-compose.ymal
is crucial to not be the same between services if it are meant to be created with different steps, because it could build
with the wrong Dockerfile
and cause the some unexpected issues, such as create the image build postgres database target with source Dockerfile: postgres.Dockerfile
, instead of postgres-target.Dockerfile
.
To set up airflow locally, we need to define at least 4 services referred from official's .yml file template including airflow-postgres
to be airflow backend database, airflow-scheduler
to execute task for LocalExecutor
, airflow-webserver
to make airflow accessible via web UI, and airflow-init
to initiate overall airflow session.
Understanding how every components in docker-compose.yml
work make much more easier to comprehend and debug issues that occur, such as depends-on
, environment
, healthcheck
, context
, build
, and storing object in &variable
.
For this project, we create 3 postgres containers, so we need to check carefully if airflow connected to its own backendDB or the right database.
Use this template from official's document in airflow.properties
file:
postgresql+psycopg2://<user>:<password>@<host>/<db>
#or
[dialect]+[driver]://[username:password]@[host:port]/[database]
# which results in
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-postgres/airflow
Note: In airflow.properties
file, airflow core need FERNET key which can be obtained from fernet.py (randomly generated)
To me, This part is quite complex compared with other topic considered by security-risk and utilizing different cloud services architecture. We will talk about these following topics:
Actually, it's available to retrieve Service Account (SA) with accesses via gcloud
CLI or Terraform, but I'd prefer to do this approach to reduce complexity for utilizing SA in further steps. So, we don't have to switch account email back and forth.
Please, follow this guideline:
- Go to your GCP project console with available access to manage cloud resources, and go to navigation menu, then go to
IAM & Admin
>Service Accounts
>Create Service Account
> Create your SA and keep the email. - You have to specify the permissions that you allow for that SA, like how much it can manage resources. What you need to do is:
- Go to
IAM
page inIAM & Admin
section > Edit principal for your created service account > Add Roles - For the sake of this topic and the next, please add these following roles for sufficient permissions:
- Artifact Registry Administrator: For creating/deleting Artifact Registry object
- BigQuery Admin : To be able to manage BigQuery Resources
- IAM Workload Identity Pool Admin: For creating/deleting Workload Identity Federation (WIF) Pool
- Service Account Token Creator: For account impersonation
- Storage Admin : To be able to manage Cloud Storage (GCS) Resources
- Workload Identity User: To be able to authenticate via WIF
- And then, save your changes.
- Go to
- Also add Service Account Token Creator role to your owner account
- For GCP, you can use your own account to interact with cloud resources directly, but in enterprise-scale project, there's will be some restriction to limit access of how someone can access or manage the resources due to security purpose.
- Using Impersonation to SA allow your email to act as that SA email to interact with cloud resources with SA's access or permission. By this methodology, team can unify everyone's access to this SA or separate group access by multiple SAs. This can enhance security of cloud management and improve data governance of the company.
gcloud auth application-default login --impersonate-service-account <your-service-account-email>
After we have proper permissions for the SA, we have to configure airflow to allow it authenticating with cloud through code. There's multiple ways to do it, like by configuring ADC, Secret as URI, or specifying long-lived credentials path. In this project, I will use the ADC method which will be elaborated on Topic 2.2.1: Long-lived credentials method vs others.
- In many tutorials, you may find instructor to tell you how to configure airflow connections on Airflow Web UI, but in reality, we should keep it as code for versioning and reducing manual process as much as possible. this can be done by using pre-defined connections connections.json
- Airflow Variables also operate in the same behavior to allow the code act differently between different stage of environments (dev/stg/prod) by specifying different variables for each stage. variables.json
- You can import connections and variables during airflow runtime by:
docker exec -it <airflow-scheduler-container-name> bash airflow variables import config/variables.json airflow connections import config/connections.json # You can also export both of it from manual configuration via Web UI # airflow variables export config/variables-export.json # airflow connections import config/connections-export.json
Note:
- Path of
config/variables.json
andconfig/connections.json
depends on mounted path from local machine or path we specified to copied it to inDockerfile
- If you used copying approach through
Dockerfile
, please note that the image would contain files that contain sensitive information, especially inconnections.json
, and can be dangerous to leak to public. google_cloud_default
is leave blank to use conenction from ADC, this method was explained by Airflow Official in reference link below.
References:
- Configuring Airflow Connections - Airflow Official
- Configuring Airflow-GCP Connections - Airflow Official
- Importing Airflow Connections - Airflow Official
- Importing Airflow Variables - Stack Overflow
Here's some additional information of my old written guideline to configure postgres connections:
Connection Type:
PostgresHost
service name of postgres in docker-compose.ymlSchema
schema name we used insetup.sql
Login
username of postgres in docker-compose.ymlPassword
username of postgres in docker-compose.ymlPort
username of postgres in docker-compose.yml
Note1: we used database name
that we specified in docker-compose.yml
in DAG script where we need to connect to the database, PostgresHook with conn_id
as Postgres Host name, and schema
as database name
.
Note2: we can omit schema
argument in PostgresHook and Airflow connection, if we use public
schema, or specify SELECT myschema.table_name
FROM ... in setup.sql
However, importing airflow connections and variables through CLI is still a manual process and we don't expect this steps in production. So, using airflow entrypoint become important. airflow-entrypoint.sh is specified in docker-compose.yml
file in entrypoint
section to be executed everytime when airflow containers created and started. airflow-entrypoint.sh
is defined with many steps including importing conenctions and variables, authenticating with gcloud using ADC, and the last line is defined according to airflow official site to continue airflow default entrypoint.
Note:
- gcloud authentication line is very important to make the container being able to pull docker image from the private repository (GAR). After the entrypoint executed, the container will contains sensitive information from using ADC to
print-access-token
into~/.docker/config.json
. This line of code is equal togcloud auth configure-docker <region>...
. - However, this process do not make airflow image become sensitive, because we did not copy ADC file to docker image directly, instead we use mounting volume. This sensitive information will be contained only in container and after entrypoint is executed, not in image. Without proper setting up mounting part, the airflow image wouldn't operate properly.
Docker : exec /usr/bin/sh: exec format error
- Add shebang line (
#!/bin/bash
) as the first line inairflow-entrypoint.sh
denied: Permission "artifactregistry.repositories.uploadArtifacts" denied on resource "projects/xxx/locations/xxx/repositories/
- Use
gcloud auth configure-docker <gcp-region>-docker.pkg.dev
or specifyprint-token-access
todocker login -u oauth2accesstoken
, please check code example inairflow-entrypoint.sh
file.
References:
- Execute custom code before the Airflow entrypoint - Airflow Official
- shebang line in airflow-entrypoint.sh - Stack Overflow
- gcloud authenticate in docker container using ADC - Stack Overflow
- In fact, the most easiest way to interact with cloud services through code is to use long-lived tokens or credentials, but it's also the least safest way to authenticate to cloud provider considering existing of secret files.
- Mostly in the Airflow Operator code provided by official provider, there're always parameters that you can indicate where your credentials can be read from regarding to where the code is running on. So, you can pack your credentials together with the code to make cloud enabling available.
- Even though, there's a risk that anyone in the team can accidentially expose this secret file to public by pushing code to public GitHub repo or pushing image containing credentials footprint to public registry.
- So, there're gradually emerging of new ways to authenticate with cloud such as using Application Default Credentials (ADC) or Workload Identity Federation (WIF) provided by GCP dedicatedly intended to remove existing of secret files existence.
- We will use ADC for GCP, and long-lived credentials for AWS in this project for the code. But we will use WIF for CI/CD pipeline.
ADC will be elaborated on this Topic, but WIF will be elaborated on Topic 6: ML Code Productionization and Deployment
According to 2.1.2 Account Impersonation and 2.2.1 Long-lived credentials method vs others, we will use ADC with impersonation to manage cloud services through Terraform.
- We manually configured IAM permissions.
- We retrieved the SA email in the previous topics.
- We executed
gcloud auth
with SA email and stored in inADC
file in- MacOS/Linux:
/.config/gcloud/application_defailt_credentials.json
- Windows:
${APPDATA}/gcloud/application_defailt_credentials.json
- MacOS/Linux:
- After execution, GCP will send a refreshed token to relevant files in the same directory of ADC.
- In the Terraform code, there's a part to be specified to use SA in
code/models/terraform.tfvars
(hidden from repo) to be used to authenticate with cloud platform. - Now, We can interact freely with GCP resources.
# workdir: code/models/terraform
terraform init
terraform plan
terraform apply
terraform destory
# workdir: code/models/terraform/auth
terraform init
terraform plan
terraform apply
# terraform destory
After we did the steps above, we will have ADC file stored in local machine as application_default_credentials.json
at $GOOGLE_APPLICATION_CREDENTIALS
.
- We will specify in
docker-compose.yml
inairflow-common
section to mount this file into the containers and set$GOOGLE_APPLICATION_CREDENTIALS
to that mount target path. - However, if we have docker-in-docker use cases such as orchestrating ML service task, we have to mount ADC to ML service container that's running in Airflow scheduler container to make ML Service also being able to communicate with cloud platform.
- You can do by mounting Local ADC from host directly to ML Service container through DockerOperator
host/path/to/gcloud:/root/.config/gcloud
. Please, see mounting example in src/dags/data_product/ml_dynamic_segment.py - Now, the container are available to interact with cloud resources after
airflow-entrypoint.sh
is executed. And by specifyinggoogle_cloud_default
airflow connection, the ETL code will also be able to interact with GCP.
References:
- Use Google Cloud user credentials when testing containers locally - Medium
- Authentication on GCP with Docker: Application Default Credentials - Medium
Instead of manual configuration through Web UI on cloud platform, We can manage cloud infrastructure as code (IaC) by "Terraform", which is a better way to manage cloud resources by reducing error-prone from manual configuring. Terraform make it easier to create, track stage, and delete the resources.
The code/models/terraform/main.tf file, using some variables from code/models/terraform/variables.tf file, will produce the following resources for ETL pipeline and ML Service:
- 3 data lake buckets (landing/staging/curated)
- 2 Bigquery datasets (silver/gold)
- 1 Artifact Registry Repository (dev)
The code/models/terraform/auth/main.tf file, using some variables from code/models/terraform/auth/variables.tf file, will produce the following resources for CI/CD keyless authentication:
- 1 workload identity pool
- 1 workload identity pool provider
terraform init
terraform plan
terraform apply
terraform destroy
terraform init
initialize Terraform (wheremain.tf
located) in your local machine (don't forget tocd
into each terraform directory first).terraform plan
to see what resources will be created, authentication validation, and syntax checking.terraform apply
to create the resources.terraform destroy
to delete the resources.
Eventually, you will see the results in your GCP services together with newly created files in local filesystem such as *.tfstate*
, and .terraform*
. Please also add these files to .gitignore
to avoid credential exposing.
Note: This section and terraform/main.tf
were not revised properly considered as deprecated code, please, use with caution.
Fow AWS, we will use S3 as a data lake, and AWS Redshift as a data warehouse. Like GCP, we need to create IAM user (which equal to service account in GCP) and get the credentials file as .csv
extension, then we use it to create S3 bucket and Redshift by terraform.
We will create an IAM user, and get the credentials file manually via Web IU. Access for AWS is quite more complex than GCP, composed of IAM user, IAM Role, and Policy which will not be described in detail in this project.
IAM User and Policies
To get the IAM user, we must have root user which is the first user we created when we created AWS account. Then,
- We need to go to
IAM
service in AWS console. - And go to
Users
>Add users
> Name an uniqueUser name
>Attach existing policies directly
. - Type in search box
AmazonS3FullAccess
andAmazonRedshiftAllCommandsFullAccess
> check the box >Create user
. - Adding custom policies for Redshift Serverless by clicking the created user and then
Add permission
>Create inline policy
,- use json option and type in this:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Action": [ "iam:*", "ec2:*", "redshift-serverless:*" ], "Resource": "*" } ] }
Next
>Create policy
- Get a csv credential file by clicking on the created user >
Security credentials
>Create access key
> chooseLocal Code
for our purpose > name an unique access key >Download .csv file
After getting the credentials file, mount it to your credentials
folder. Now, we can use it in terraform to create S3 bucket and Redshift resources.
In this part is quite complex due to "static credentials" behavior, since we don't need to hard-coded or type in the key derectly to the terraform file. So in general, we will use "terraform.tfvars" to pass the hard-coded credentials to terraform file and add terraform.tfvars
to .gitignore
.
The concept is simple: we create resources in main.tf
where some part of it use var.
to refer to the variable specified in variables.tf
file. In variables.tf
file, we specify the variable name, type, and default value, if the default value is not specified, we have to pass the value interactively after terraform apply
as inputs OR pass it automatically by creating terraform.tfvars
file and type in the variable name and value. This is where we will copy credentials from csv to put it in (and again please don't forget to add both files to .gitignore
).
All you need to do is creating terraform.tfvars
(must be this name) in your terraform
folder, and type in the following:
# aws credentials
aws_access_key = "your-access-key"
aws_secret_key = "your-secret-key"
# optional for serverless redshift
# Application Definition
app_environment = "dev" # Dev, Test, Staging, Prod, etc
# Network Configuration
redshift_serverless_vpc_cidr = "10.0.0.0/16"
redshift_serverless_subnet_1_cidr = "10.0.1.0/24"
redshift_serverless_subnet_2_cidr = "10.0.2.0/24"
redshift_serverless_subnet_3_cidr = "10.0.3.0/24"
Then, you good to go with terraform apply
.
In Addition, configuring Redshift Serverless is quite complex, so I will not go into detail, but you can check the code in main.tf. Basically we need to create the following:
- Data "aws_availability_zones" to get the availability zone.
- VPC
- Redshift subnet, and also config your available IP address in
terraform.tfvars
. - IAM role for Redshift Serverless
- Grant some access and attach some policy for the role
- Workgroup
- Namespace
Note: Redshift Serverless let us create a data warehouse without managing the cluster ourselves, it can scale down to zero or pay as you use, but it's still in preview.
In this project, I used a dataset from kaggle which was:
- loaded to postgres database
- uploaded to this repo github as csv format
- and I wrote DAGs to use Kaggle API to obtain the dataset directly from the Kaggle website.
In this project, I wrote main script: ecomm_invoice_etl_dag.py
to create 1 DAG of (8+1) tasks, which are:
- Reading data from raw url from github that I uploaded myself. Then, upload it to GCP bucket as uncleaned data.
- Fetching (Unloading) data from the postgres database that we simulate in docker containers as a data source. Then, upload it to GCP bucket as cleaned data.
- Downloading from the Kaggle website using Kaggle API. Then, upload it to GCP bucket (GCS) as uncleaned data.
- Data Transformation: reformat to parquet file, and cleaning data to be ready for data analyst and data scientist to use, then load to staging area.
- Loading data located in staging area to data warehouse (Bigquery) as cleaned data with native table approach.
- Loading to data warehouse (Bigquery) as cleaned data with external table approach from staging area.
- Loading to another Postgres database as cleaned data.
- Clearing data in staging area (GCP bucket). (this will not be used for external table approach that requires the source file to be exists)
Additionally, I also wrote transform_load.py and alternative_cloud_etl.py as utils to enable different cloud services.
I intentionally separate the code for AWS and GCP, so we can easily find between them. The code for AWS is in alternative_cloud_etl.py file. The code is quite similar to GCP, but there are some differences, such as:
- using
boto3
instead ofgoogle.cloud.storage
to connect to S3. - how we use credentials file to connect to the cloud.
- using
psycopg2
instead ofbigquery
to load to Redshift. - uploading clean data to S3 bucket using
awswrangler
, which implemented onpandas
library, instead of usinggcsfs
. - how to fetch the data from postgres database to gcp and aws, which is different in the code, but the concept is the same.
Note: Since we use the same file that is downloaded or fetched, so the deleting file from local airflow docker container will be deprecated avoiding conflict between clouds. Unless, it will cause a bug by shutil
in the first run, because the file is not exists in the local container.
Note: Loading to Redshift part will be described more in the future
Note: We have to change the parameters conn_id
in ecomm_invoice_etl_dag.py
to match airflow connections that we specified.
- Referring to official documentation of how your code should be written is a good way to develop and allow other team members to maintain easily. It's always a good practice to stick with the best practice and avoid working around.
- When I first load the parquet file to pre-defined schema in Bigquery, I encountered the error that the schema is not matched. I found out that the schema of parquet file is not the same as the schema pre-defined in Bigquery by extra column: "index level 0". So, the solution is to drop the column before saving to parquet file in Google Cloud Storage by using
df.to_parquet(..., index=False)
- (Even you download the
index=True
parquet file to check in pandas, it will not show the extra index column)
- (Even you download the
- To hard refresh airflow session, we can remove local volumes by:
docker compose down --volumes --rmi all docker system prune --all --volumes
- Please, also remove local airflow backendDB
postgres-db-volume
, andlogs
files insrc
folder.
- Please, also remove local airflow backendDB
If you use different dataset, you might have to write your own DAGs what match your specific use cases.
In the DAG script, we have 2 tasks to load data to Bigquery, one is using Native table, and another is using External table. the differences between them are their performance, how it works, and user interface for that table.
External tables are suitable when you want to query data without loading it into BigQuery, optimizing storage costs and leveraging existing data sources. Native tables, on the other hand, offer enhanced performance and advanced features within BigQuery, like partitioning and clustering and more user-friendly.
- Airflow
- In the best practice, we wrap ETL processes into Airflow Opeators separately, can be either importing from official sites or sometimes customizing it to meet team's agreement or according to team's data governance.
- Separating processes by multiple operators and written as the OOP pattern can represent DAGs, airflow jobs, or workflows in comprehensive way and make a task to be atomic and isolated from each other.
- Most of time, official providers have their own developed operators associating with their products such as
GCSOperator
,BigQueryOperator
, etc. that prevent us from error-proning developing it ourselves.
- Data Warehouse
- This aspect can be vary across companies depended on data architecture design and data stack. However, there's quite a new term for modern data engineering called ELT pattern, which introduces a huge different from ETL. Basically, ELT allow us to adopt medallion architecture, separating processes into multiple data stages, keeping raw extracted data for later data reconciliation, and enabling fault-tolerant architecture by convenient backfilling due to schema changes and unexpected failure.
- External table makes ELT process easier to be implemented reducing the gap between data lake and data warehouse, but still can be benefited from partitioning by Hive-style Partitioning
- Actually, Parquet files embed data schema into the files. We can use some operators that support referring parquet internal schema to create external tables in data warehouse. So, we don't have to pre-define schema for every tables, except within an stardardized data processing framework.
- Transformer from Bronze layer to Silver layer can be a custom framework that stardardizes data format and follows company's data governance. There's multiple relevant libraries can be mentioned here, such as pyspark, dlt, and etc. depended on data sources, acceptable cost, and team expertise in tools.
- There're also multiple tools that support transforming data from Silver layer to Gold layer such as dbt, SQL Mesh, and etc. or even native airflow operator. Such tools give ability to visualize transformation logic and enable data lineage, native integrated data dict for a better maintainance.
- Many companies discourage the use of these tools giving the reason that it's not appropriate for big scale projects and too messy to maintain. However, I disagree this because there're practices and principles that solve this problem, like Data Mesh (twisted from Domain-driven design: DDD) and Modularity.
- Anyway, it requires highly skilled data archietct and data engineer to develop and maintain these tools along with the priciples. So, they might not be able to focus this enough since it can be quite a low-level foundation and very far from a product that's seemed to be more profitable to the company.
This topic is quite important, since it can be highly involved in cost optimization.
- Although ingestion process is completed, we might need to transformed it, besides of raw cleaned and normalized data, with custom business logic, and tables aggregation to further analyze and exploit more from data.
- This process usually referred to ELT or transformation that is not the same transformation in ingestion process, which is just controling data quality by casting type, formatting date/timestamp or mapping data into specified schema by a framework.
- Transformed data models can be located in curate or gold layer in medallion architecture, but sometimes can be in semantic layer which is referred to another transformation sub-layer before being shown on dashboard. However, this depends on companies' data archietcture and agreement between teams.
- Transformation process can be done with SQL by analytic engineer, data analyst, domain expert, and etc. to make data become more meaningful to business aspect. However without proper expertise in SQL and data modeling, cost from dashboard usage and data preparation can be gone in a very wrong way.
- Tools that support transformation at aggregation level is emerging, since ELT pattern is still counted as modern at the moment for batch data engineering pipeline. Some of famous tools you might have heard could be dbt, and SQLMesh which enable ability to perform complex incremental data loading logic, SCD handling, data lineage, data dict, and built-in data quality controlling at high level. However, you could still implement data aggregation with native airflow operator introduced by official provider by executing sql directly to data warehouse through Airflow.
- What should be considered in gold layer (or curated) is technical aspects such as, SCD (Slow Changing Dimension), snapshot and incremental transformation (which usually involved with how ingestion pipelines work and silver layer partitioning), data transferring behavior or ingestion method (full dump, append, delta, or replace), and business logic (if semantic layer not exists or it required pre-calculation in gold layer)
- Pre-calculated table was proved to be more optimized when data is queried to dashboard. Moreover, data aggregation with only necessary partitioning considered also important in cost optimization when it come to larger scale in production database.
- Using view not only prevent downstream users from modifying table in ideal, but also manipulate the users to query from only specific period of data or partition resulting in lesser data scanning per read or per dashboard refresh.
- Using view can futher lead to data governance issues such as user permission to read from both table and view if not considering dashboard permission.
- Modeling data warehouse to star schema or snowflake schema (considered to be fact/dimension pattern) is claimed multiple times to be more cost optimized, but it lack of quantitative evidence.
Once we have the data in the data warehouse, we can do the EDA and data visualization connecting data from the data warehouse. I will use PowerBI to create the dashboard, and Python for some ad-hoc analysis.
Actually, we might have to do the EDA (Exploratory Data Analysis) before loading data to data warehouse to see how we would clean the dataset, but in the analysis part, we can do in different objectives. In this part, I will do an EDA to see the characteristic of the data, and see if there is any insight that can be meaningful to the business.
Data Characteristics
This part is mostly done with Python by Pandas and Matplotlib, because of its flexible.
CustomerID
contains null values- fill null values with int 0 → for RFM, CustomerID 0 will be removed
Description
contains null values- fill null values with “No Description”
Description
contains different descriptions for the same stock code- replace the most frequent description → for market basket analysis
UnitPrice
contains negative values- StockCode "B": adjust bad dept
Quantity
contains negative values means the refundQuantity
andUnitPrice
contain outliers- need to be removed for model development
- should be kept for revenue analysis
Quantity
< 0 &UnitPrice
= 0 → Anomaly- should be removed for model development
- should be kept for revenue analysis
- should be filtered out for market basket analysis
- it can be something, like damagae, manually filled, etc.
InvoiceNo
contains "C" → refund- Inconsistent
StockCode
- "D" → discount
- "M" → manual (there's also "m")
- "S" → sample
- "BANK CHARGES" → bank charges
- "POST" → postage
- "DOT" → dotcom postage
- "CRUK" → charity
- "PADS" → pads to match all cushions
- "C2" → carriage
- the digits StockCode with a letter → the same product but different colors
- More from random data description:
UnitPrice
is in "Pound sterling" Country
mostly contains "United Kingdom" → more than 90% of the dataInvoiceDate
includes time, but not in the same timezone, for different countries
You can check the code in ecomm_eda.ipynb file.
Since, the current version of PowerBI has a bug of connecting to data warehouse, I will connect the data from local parquet file that loaded from data warehouse instead.
We can connect parquet file locally to Power BI:
- Go to
Get Data
> Type in"Parquet"
>Connect
> use Basic option, and type in URL box by this:file:///C:/path/to/data.parquet
> Then, click OK
Note: you have to change the path to your local path
Now we can see the data in PowerBI, and do the data visualization.
I won't go into detail of how to create the dashboard or each component, but you can download and check it yourself in ecomm_bi.pbix file.
What worth to mention are:
- I used DAX to calculate Growth Rate as a measure that would be quite complex for some people who are not familiar with PowerBI or DAX, but it used just regular formula as:
(Sales in the Current Context month - Sales in the Previous month) / Sales in the Previous month
. I also usedCALCULATE
function to calculate the previous month value together withALLSELECTED
, andDIVIDE
function to calculate the growth rate. - I linked variables between reports to be able to Drill Through to other reports, making the dashboard more interactive, and more in-depth to analyze.
- Dashboard is about Storytelling, so it's better to have a story in mind before creating the dashboard. It's not just about the data, but how to arrange the story from the data.
- It's crucial to know who are the audiences of the dashboard, and what the objective of the dashboard is. So, we can select the right metrics, right data, and right visualization.
- Data model is very important, it's the foundation of the dashboard. If the data model is incorrected, the dashboard will be wrong also. If the data model come in a good shape, the dashboard will be easier to create, and the data will be easier to analyze. (especially in aspect of Time Intelligence)
The Model Development part is not fully finished yet, but some part of it are done and ready to be presented.
This is what I have planned so far:
- Customer Segmentation By RFM, KMeans, and Tree-based Model (Done)
- Market Basket Analysis (Done)
- Demand Forecasting (Paused)
- Recommendation System (Not started)
- Customer Churn Prediction (Not started)
- Price Analysis and Optimization (Not started)
You can see the code in model_dev.ipynb
Introduction
RFM (Recency, Frequency, Monetary) is a well known method to segment customers based on their behavior. But recency, fequency, and monetary alone, are not effective enough to segment the diversity of customers. We should use other available features or characteristics of customers to segment them more effectively that come to the harder to score with traditional RFM method.
As a result, KMeans emerged as a crucial machine learning technique for reasonably effectively clustering clients into more precise customer segments. But, the Kmeans is not interpretable, we can't explain the result or criteria to the business of how clusters are formed or how the customers are segmented, but only show what features we use in KMeans model (we don't even know which features are more influence).
So, we can use Decision Tree to find the criteria of the clusters, and explain the result to the business, leading to proper campaigns and strategies that suit to customer segments to be launched.
Moreover, we can use XGBoost to find the most important features that influence the customer segments (or each segment), and further concern and develop strategies regarding that features.
First, we will form the RFM table based on customer transactions and add some more features (Feature Engineering) that we think it might be useful for segmentation which include:
- Recency
- Frequency
- Monetary
- is one time purchase
- mean time between purchase
- mean ticket size (average total price of each purchase)
- mean of number of unique items of each purchase
- mean quantity items of each purchase
- mean spending per month
- frequency of purchase per month
- (refund rate)
Eventually, we will get a new table that represents customers profile and their behavior, this table is a rfm
variable in the code in model.dev.
Note: The features we used to decribe the customer behavior is not limited to the features above, we can add more features that we think it might be useful for segmentation, but also should be interpretable for initiaing the campaigns and strategies, so some features, like standard deviation, might not be appropriate for this case.
Before we put the customer profile data into KMeans, we should scale the data first , in my case I used Robust Scaler, to make the data have the same scale, since KMeans algorithm use euclidean distance to calculate the clusters, so it could sensitive to the different scale of the data.
Then, we will use KMeans to segment customers into clusters. We will use Elbow Method and Silhouette Plot to find the optimal number of clusters which eventually is 10 clusters.
Then, we label the clusters to customer profile table which not scaled yet, so we can see the customer profile of each cluster with the original scale to make it more interpretable. We will use this table to find the criteria of the clusters as an input and validation set for the Decision Tree.
References:
- Selecting the number of clusters with silhouette analysis on KMeans clustering
- Segmenting Customers using K-Means, RFM and Transaction Records
After that, we will use Decision Tree to find the criteria of the clusters to make the clustering result from KMeans become interpretable. I used Random Search with Cross Validation to find the best hyperparameters of the Decision Tree to make it the most accurate resulting in accuracy 91.41% on test set, 94.11% on train set, macro averaged of F1 score 0.75, to clarify criteria of each cluster.
Note: The 91% accuracy doesn't mean we segmented the customers correctly by 91%, because we can't be sure that Kmeans correctly 100% segmented the customers (and definitely, it didn't), but we can tell that the Decision Tree can explain the criteria of the clusters correctly by 91% in generalization, So we can take account of the criteria from the Decision Tree to cluster customer segmented to follow Kmeans result.
Finally, we will use XGBoost to find the most important features that influence the customer segments or important factors to develop the strategies. I also used Random Search with Cross Validation to find the best hyperparameters of XGBoost resulting in accuracy 97.76% on test set, 100% on train set, macro averaged of F1 score 0.77, to find the most important features (seems overfitting, but it's not a main focused).
You may heard of feature importance from XGBoost that can describe which features are more influence to the model, but it's not effective enough due to its limitation, like how it treats the correlated features, how differently treats the categorical features and numerical features, and how it's not generalized to other models. So, I used Permutation Feature Importance instead, which is more interpretable and generalized for choosing the most influence features to each cluster. Moreover, it can be used with any model, not just XGBoost.
References:
- Permutation feature importance - sklearn
- Permutation Importance vs Random Forest Feature Importance (MDI)
The most important features for each cluster can be shown in the plot below:
Finally we can classify the customers into 10 clusters and explain the criteria of each cluster to the business.
- Cluster 0 : Bought less, not often, and small ticket size, I think this is a cluster of casual customers or one-time purchase customers.
- Cluster 1 : Bought much in amount, but low quantity.
- Cluster 3 : Bought less in amount, but bought often, and small ticket size, I think this is a cluster of regular customers who are likely to active and easily attracted by marketing campaign.
- Cluster 4 : Big spenders, and bought a lot of items both quantity, and diverse items, I guess this is a cluster of wholesale customers.
- Cluster 5 : Also Big spenders, and bought a lot of items both quantity, and diverse items, I also think this is a cluster of wholesaler but bigger than cluster 4, because they bought in more amount and more quantity.
- Cluster 7 : Big spenders, but not much as cluster 5, spent 50 - 155ÂŁ per month and bought once a month approximately, and also bought in low quantity. I guess this is a cluster of valuable regular customers that we should keep. Moreover, they're also the largest group among all clusters.
... and so on.
Note: the more cluster and the more depth you have, the more complex and specific the criteria of each cluster will be, leading to harder to classify.
The Market Basket Analysis is a technique to find the association between items that customers purchase together. It can be used to find the relationship between items, and use the result to develop strategies such as cross-selling, and product bundling.
The important metrics of Market Basket Analysis are:
- Support
- how frequent the itemset appears in the dataset
- can be calculated by: support(itemset) = count_basket(itemset) / total_count_basket
- support score range from 0 to 1, "minimum support" can be used to filter out itemset that are not frequent (threshold depends on user and dataset)
- Confidence
- how likely item B is purchased when item A is purchased
- confidence(A→B) = count_basket(itemset (A and B)) / count_basket(A)
- confidence score range from 0 to 1, "minimum confidence threshold" can be used to filter out itemset that are not meet the requirement (threshold depends on user and dataset)
- Lift
- how likely item B is purchased when item A is purchased, how many times probability increase or decrease compared with normal B purchased
- lift(A→B) = confidence(A→B) / support(B)
- lift score range from 0 to infinity (in times unit)
- lift > 1 → B is likely to be purchased when A is purchased
- lift < 1 → B is unlikely to be purchased when A is purchased
- lift = 1 → B is independent from A
The Market Basket Analysis can be done by using Apriori Algorithm which is a popular and effective algorithm to find the association between items. one can use mlxtend library to implement Apriori Algorithm following these steps:
- Step 1: Find frequent itemset (itemset that meet the minimum support threshold)
- Step 2: Generate association rules from frequent itemset (itemset that meet the minimum confidence threshold)
- Step 3: Sort the rules by lift
- Step 4: Visualize the rules as a table and interpret the result
What challenges in market basket analysis are data preparation, specifying threshold of each metric, and interpret the result to the business. It's can be done easily and useful for the retail business.
for example of the result of market basket analysis can be shown in the table below:
Antecedents | Consequents | Support | Confidence | Lift |
---|---|---|---|---|
PINK REGENCY TEACUP AND SAUCER | GREEN REGENCY TEACUP AND SAUCER | 0.034121 | 0.828530 | 15.045681 |
GREEN REGENCY TEACUP AND SAUCER | PINK REGENCY TEACUP AND SAUCER | 0.034121 | 0.619612 | 15.045681 |
PINK REGENCY TEACUP AND SAUCER | ROSES REGENCY TEACUP AND SAUCER | 0.031806 | 0.772334 | 13.714834 |
... | ... | ... | ... | ... |
LUNCH BAG BLACK SKULL. | LUNCH BAG PINK POLKADOT | 0.030560 | 0.425620 | 6.896678 |
Note: The table is snippet from the result of market basket analysis
We can interpret the result as:
-
PINK REGENCY TEACUP AND SAUCER and GREEN REGENCY TEACUP AND SAUCER are frequently purchased together, the itemset appeared by 3.41% of total transactions (considered by support score)
Note: In our case, we filter out single item purchase transaction, hence the 3.14% is the percentage of transactions that contain both 2 or more items which is
basket_encoded_filter
variable in the notebook resulting in 15660 x 0.034121 = 534 transactions out of 16811 transactions. -
the probability of GREEN REGENCY TEACUP AND SAUCER purchased when PINK REGENCY TEACUP AND SAUCER purchased is 82.85% (high confidence)
-
the probability of GREEN REGENCY TEACUP AND SAUCER purchased when PINK REGENCY TEACUP AND SAUCER purchased is 15.04 times higher than alone GREEN REGENCY TEACUP AND SAUCER purchases (high lift)
-
the second row (or rule) is the same as the first row, but the consequent and antecedent are swapped, so we can watch the result from both sides
-
the third rule and so on, show the bundle of item that are frequently purchased together, these rules can be used to develop strategies such as cross-selling, product bundling.
References:
- How To Perform Market Basket Analysis in Python - Jihargifari - Medium
- Association Rule Mining using Market Basket Analysis - Sarit Maitra - Towards Data Science
In this section, we will use Time Series Forecasting technique to predict future values based on the past values of the data. we will use some input from the past as features to predict the future sales.
In general, we use current features or features that already happened fed into the model to predict the sales as a target. But, the problem is, if we want to predict the sales of the next month, we don't have the input of the next month yet. So, we have to use the records of the past to predict the sales of the next month.
Therefore, we have to perform feature engineering, transform data, create features to obtain the appropriate and effective features to predict the future sales. So, we will use Lag Features and Rolling Window Statistics.
But how can we know how much lag of the features and how many rolling window statistics should we use? first we can use Auto-correlation to find the optimal lag value or candidates to be used as lag features. Then, we can use Cross-Validation to find the optimal number of rolling window and the best candidate of lag features.
Additionally, I used Fast Fourier Transform to find seasonality of the data, which is the pattern that repeats itself at regular intervals of time. The seasonality can be used to create features to predict the future sales.
Personally, I thought using fast fourier transform to find seasonality is quite more quantitative than using autocorrelation. But, we can use both of them to find the seasonality of the data to ensure the result.
I think the most challenging part of timeseries forecasting is to find the appropriate features to predict the future sales. The features that we use to predict the future sales should be the features that already happened in the past, and we can't use the features that will happen in the future. So, checking how much lagged values of the features can be significant to predict the future sales.
Model | RMSE | MAPE |
---|---|---|
Baseline (Mean) | 3170.143 | 27.16% |
LightGBM | 4884.230 | 32.29% |
(Current Result)
Even we can see that the prediction result can a bit capture the trend of the data, but the result is not good enough compared with mean of the target.
I intended to decompose the data into trend, seasonality, and residual, then use them as features to predict the future sales to make it stationary and also add moving average types such as EMA (Exponential Moving Average) and LWMA (Linear Weighted Moving Average) to the model, to weight the recent data more than the old data. Moreover, I want to test traditional statistical model such as ARIMA, and SARIMA. But, I think it's enough for now, I will update the model later.
References:
- How to Calculate Autocorrelation in Python?
- How to detect seasonality, forecast and fill gaps in time series using Fast Fourier Transform
- How To Apply Machine Learning To Demand Forecasting (Concept)
- All Moving Averages (SMA, EMA, SMMA, and LWMA)
- Finding Seasonal Trends in Time-Series Data with Python
- Various Techniques to Detect and Isolate Time Series Components Using Python (Technical)
After we developed and evaluated the model, we can deploy the model to production to leverage the business, bringing the model out from the Python notebook or your lab, and not only making it available to the data scientist.
We can deploy the model to production in many approaches, please refer to this repo: MLOps - ML System for further details of principles and research on MLOps methodology. However, most approaches required docker container for consistent runtime even in different environments, so I will elaborate more on containerization for this topic.
Due to latest revise (Nov 2024), I decided to re-write this from the previous version of this chapter into a new one, emphasizing less on typical concepts and focusing more on specific use case for this project. However, considering how huge scale of this project was, I also intended to separate this chapter into MLOps - ML System project to clarify more on my most interested topic in detail.
Anyway, documentation for this topic will mostly focus more on overview concepts of packaging ML code to be ML pipeline with airflow dag integration and together with deployment on cloud through CI/CD Workflow. Interface service of inferencing such as an API with backend database and Model Monitoring, will be further implemented on, again MLOps - ML System.
Although this project introduced multiple ML use cases, I choose Customer Segmentation By RFM, KMeans, and Tree-based Model to be deployed with precompute serving pattern, so there's no model wrapped into a container or prediction API service, just ML Pipeline within docker container deployed with CI/CD Pipeline.
MLOps Code Structure and Logic
First, we will focus on extracting the model development processes from notebook and transforming it into python module with factory pattern using argparse
library and OOP concept. Some processes were improved to be more appropriate to be operated automatically through pipeline. Such as:
- How data read/written into ML code
- Reader and Writer classes are implemented to support feeding input and retrieving output from ml service both local and cloud environments.
- Selecting K Value
- Model Development Changes
- Changing feature engineering process imputing recency feature from
999
to the real recency - Replacing interpretation method from decision tree or xgboost feature importance with LightGBM and permutation feature importance for more interpretable and quantitative measurement.
- (please, check ML-Sandbox/dynamic_segmentation/revise_ecomm_logic.ipynb for final ML Logic)
- Changing feature engineering process imputing recency feature from
- Model Retaining Triggering is now implemented
- Final output will be 2 data models, 2 ML models and artifacts (metadata + control file) and scaler.
- please check folder
code/models/output/
for output example - df_cluster_importance.parquet
- df_cluster_importance.parquet
- please check folder
- code/models/main.py : entrypoint of the application using argparse
- code/models/abstract.py : abstract classes defining overview of implemented classes
- code/models/io_services.py : Input and Output related services (reader/wrtier)
InputLocalReader
: Reader for input (data and interpreter if exists) on localInputGCPReader
: Reader for input (data and interpreter if exists) GCP cloudInputProcessor
: Reader type selector (factory) based on argparse argumentOutputLocalWriter
: Writer for output on localOutputGCPWriter
: Writer for output on GCP CloudOutputProcessor
: Writer type selector (factory) based on argparse argument
- code/models/ml_services.py : ML Code Logic as a pipeline including,
CustomerProfilingService
: RFM - pre-processing, feature engineering to formulate RFM DataFrameCustomerSegmentationService
: Clustering service, related to using KMeans model, and cross-validation automated finding optimal K value.ClusterInterpretationService
: Interpretation service related to training/retraining LightGBM and utilizing permutation feature importance to find influenced features for each clusterMLProcessor
: Orchestrator of overall ML services
- code/models/requirements.txt : Python dependencies used in the app
- code/models/Dockerfile : Packaging the app and dependencies to a docker container, installing additional components as app required (e.g. lightgbm prerequisites)
Please refer to code/models/README.md for example of command usage.
After python module works properly on local machine, we will package the code into docker container for the next step by code/models/Dockerfile file.
Note: What worth to mention is that the path specifying in the Dockerfile is relative to the current working directory where we execute the docker build
command. So, we have to be aware of the path we specify in the Dockerfile and how we execute the file via Command Line Interface (CLI).
With a few research, I found that it's not possible to specify the path to copy the files from the parent directory.
And when we execute the command, we have to be in the ./code/models/ realative to root of the project and use this command pattern:
# workdir: ./code/models/
docker build -t <image-name>:<tag> -f <path-to-dockerfile> <path-to-working-directory>
# -t: tag the image with the name
# -f: (Optional) specify the path to Dockerfile that we want to execute. Required when multiple Dockerfiles exist in the same working directory
# which can lead to this command
docker build -t ecomm/interpretable-dynamic-rfm-service:v2 .
- Before packaging code into docker container, we have to make sure it works as expected within local environment.
Note: Please refer to initial_gcp.sh before testing against cloud environment
# test module local python main.py --env local --method filesystem --input_path '../../data/ecomm_invoice_transaction.parquet' --output_path output --exec_date 2024-11-03 # test module local force_train python main.py --env local --method filesystem --input_path '../../data/ecomm_invoice_transaction.parquet' --output_path output --exec_date 2024-10-29 --force_train # test module against gcp # TODO: Replace with your bucket name python main.py --env gcp --project_id <project_id> --method filesystem --input_path 'gs://<landing_bucket_name>/input/data/2024-11-03/ecomm_invoice_transaction.parquet' --output_path 'gs://<staging_bucket_name>/output' --exec_date 2024-11-03
- After packaging code into docker container, we have to make sure that its functionality both local and cloud method work perfectly within the container
# test module local via docker docker run \ ecomm/interpretable-dynamic-rfm-service:v1 \ --env local \ --method filesystem \ --input_path 'ecomm_invoice_transaction.parquet' \ --output_path output \ --exec_date 2024-10-29 # test module against gcp via docker # TODO: Replace with your bucket name docker run \ -v $HOME/.config/gcloud:/root/.config/gcloud \ # mount GCP default credential ecomm/interpretable-dynamic-rfm-service:v2 \ --env gcp \ --method filesystem \ --input_path 'gs://<landing_bucket_name>/input/data/2024-11-03/ecomm_invoice_transaction.parquet' \ --output_path 'gs://<staging_bucket_name>/output' \ --exec_date 2024-11-03
If all steps work perfectly, we can now go to the next step: deploying to cloud
After we have everything ready, we will deploy containerized ML Pipeline to cloud environment utilizing Google Artifact Registry (GAR), similar to Docker Hub but more secured within GCP.
So, what we need to do is:
- Prepare cloud environment and authentication
- Build the Docker image locally and push it to the Artifact Registry
Since we have already set SA permission in Topic 2.1.1 Setting up IAM and Service Account, so there will be just authentication confirmation to be explained
- ADC is a method to authenticate to GCP interactively through CLI via gcloud. It's a proper authentication method when there's human involved between the processes, but impossible with automated pipeline.
- ADC method let us login to GCP with email address through web browser interactively, and then it store a credential to gcloud default path to look for automatically when we try to authenticate with GCP with gcloud
Please, run these commands to authenticate with impersonated account
# For first time using gcloud
gcloud auth login
# Logic through impersonated account through ADC method
gcloud auth application-default login --impersonate-service-account <SERVICE_ACCT_EMAIL>
# Set project
gcloud config set project <project_id>
Then we execute these terraform configs
- /code/models/terraform/main.tf together with /code/models/terraform/variables.tf
- /code/models/terraform/auth/main.tf together with /code/models/terraform/auth/variables.tf by
# workdir: code/models/terraform
terraform init
terraform plan
terraform apply
terraform destroy
# workdir: code/models/terraform/auth
terraform init
terraform plan
terraform apply
# terraform destroy
Note: WIF shouldn't be created/deleted repeatedly, refer to Stack Overflow - Terraform on GCP - Error 409: Requested entity already exists
Error: Error creating WorkloadIdentityPool: googleapi: Error 409: Requested entity already exists
If below error occurred, please check if you have the 'Service Account Token Creator' role in both owner account and service account. if yes, please try to reset $GOOGLE_APPLICATION_DEFAULT
environment variable by unset $GOOGLE_APPLICATION_DEFAULT
, restarting terminal and try to authenticate with gcloud
with the above process again.
{
"error": {
"code": 403,
"message": "Permission 'iam.serviceAccounts.getAccessToken' denied on resource (or it may not exist).",
"status": "PERMISSION_DENIED",
"details": [
{
"@type": "type.googleapis.com/google.rpc.ErrorInfo",
"reason": "IAM_PERMISSION_DENIED",
"domain": "iam.googleapis.com",
"metadata": {
"permission": "iam.serviceAccounts.getAccessToken"
}
}
]
}
}
References:
- Terraform Code
- Debugging Authentication
To puslish created docker image to GAR, please run the following commands:
# workdir: code/models/
# gcloud auth application-default login --impersonate-service-account <SERVICE_ACCT_EMAIL>
# gcloud configure docker
gcloud auth configure-docker "<$REGION>-docker.pkg.dev"
# Build image locally
docker build -t <any-image-name> .
# Change image name locally to acceptable format for GAR
docker tag <previous-step-image-name> <$REGION>-docker.pkg.dev/<$PROJECT_ID>/<$REPOSITORY>/<$IMAGE_NAME>:<$TAG>
# Push image to GAR
docker push <$REGION>-docker.pkg.dev/<$PROJECT_ID>/<$REPOSITORY>/<$IMAGE_NAME>:<$TAG>
Check the result, pushed image, at artifact registry gcp service.
Note: You can export the environment variables to avoid typing the same variables over and over again, reducing the chance of making a mistake.
Reference:
Although Linting and Code Formatting is usually considered as supplement, adding these components make your code have more quality and is written in the best practice and convention. With pre-commit, you can prevent your development team to push low quality code to GitHub by regulating Linter which can be apply manually or through pre-commit hooks.
In this project I use Ruff for only python linter (no formatter) together with pre-commit ruff hook.
Related Files:
- ruff.toml : To configure python linting options and styles.
- .pre-commit-config.yaml : To enable pre-commit and set configuration
# pip install code/models/requirements.txt
pip install ruff==0.7.3 pre-commit==4.0.1
# ruff
ruff check
# ruff with glob pattern
ruff check code/models/*.py
# ruff format
ruff format
# pre-commit
pre-commit install
# try git commit, if your code is failed to commit, you will be unable to commit your code until it's fixed.
pre-commit uninstall
Note:
- Pre-commit will only apply on the files that are committed or staged changes, and if you specified additional conditions, like only specific path to be checked with through pre-commit, it will comply with it also.
- Please note that pre-commit in this project is configured to only check linting in
code/models/
path. - Pre-commit also supported other languages with various types of hook.
- Ruff is currently the fastest linter for python, since it's written in rust. However, there's also other python linters and formatter in market such as pylint, flake8, black that are different by coding-style and how to configure it.
References:
- Pre-commit Official Documentation
- Python Linter: The Ruff Linter - Ruff Official
- Ruff Configuration: Configuring Ruff - Ruff Official
What more important is unit testing. Personally I don't really favor this topic, but it's the best practice to specify test cases and test it with your code (function, and class) to ensure that every changes in your code during development are still and will be working as expected. In this project I use pylint that's quite basic and popular among the communities for python.
Related File:
- test_ml_services.py : to demonstate applying unit testing for "finding optimal K" function in ML Service.
# make test
PYTHONPATH="code/models" pytest tests/unit/test_ml_services.py --disable-pytest-warnings
Note:
- Without specifying
PYTHONPATH
, we have to specify relative path to where the testing script is forpytest
argument which could lead to more overhead of specifying path within your project. - Use flag
--disable-pytest-warnings
to avoid minor warnings.
References:
- Using of
PYTHONPATH
- Stack Overflow - Multiple test cases with pytest - parametrize: How to parametrize fixtures and test functions
- OOP pytest: Object oriented testing in Python - geeksforgeeks
- Suppressing pytest warnings: Stack Overflow
When we make changes on the app service, we certainly need to re-deploy it to the cloud environment with a newer version for application improvement. But, we don't want to do it manually every time, especially on production environment, which can cause error-prone and time-consuming processes. So, this come to the important of automating the deployment process or CI/CD Workflow. CI/CD Pipeline make sure your developed application is well-maintained, qualified with tests, and deploy securely in predictable way without error-prone issue.
Github Actions (GHA) is one of many CI/CD tools that we can use to automate the deployment process. It can automate tasks when we manually trigger it or with specific events we specified as its condition within the workflow. For example, we can automate the deployment process when we push on specific branch or create pull request with a certain branch to the repository.
GHA workflow and code logic is not different from manual deployment process, including installing gcloud, authentication, building docker image from source code in GitHub, and pushing to specified image registry, except we can do this all processes with a few click. However, GHA workflow has its own syntax and some specific feature such as using pre-defined action, and accessing variables and GitHub default variables which make it a bit more complicated than regular bash command such as:
# Using Environment Variable
echo $ENV_VAR
# This equal to above, but more useful in f-string scenario
echo ${ENV_VARIABLE}
# This call GitHub default variables
echo ${{ github.ref_name }}
echo ${{ inputs.xxx }}
echo ${{ env.xxx }}
# secret variable will be masked with '****' in GHA logs
echo ${{ secrets.xxx }}
echo ${{ steps.STEPS_ID.outputs.* }}
# Example of Other GHA Syntax
...
runs-on: ubuntu-latest
steps:
- name: step name 1
id: step_id_1
uses: repo/action@v1
with:
xxx: xxx
- name: step name 2
id: step_id_2
run: |
echo xxx
...
Related Files:
- .github/workflows/build-deploy-model.yml : CD Workflow
- .github/workflows/pull-request-ci.yml : CI Workflow
Reference:
- CI Workflow - Pull Request Event Trigger: Events that trigger workflows - GitHub Official
The most important aspect for Continuous Deployment (CD) Workflow is authentication, because we need to make some changes to cloud resources. Traditionally, using long-lived credentials just like password through GHA secrets variables is the most typical and straightforward solution.
- You have to get a credential file and copy its contents
- Go to Repo > Settings > Secrets and variables > Actions
- Create New repository secret, suppose secret name:
GCP_PASSWORD
and paste credential content as Secret - Now, you can use the secret for authentication
- uses: 'google-github-actions/auth@v2' with: credentials_json: '${{ secrets.GCP_PASSWORD }}'
However, this long-lived credential is considered as security risk. So, there's a new service from GCP called 'Workload Identity Federation' (WIF) which introduce authentication without credentials i.e. keyless authentication.
WIF is quite complex in architecture perspective, but for usage, we just need service account email (without credential), WIF Pools, WIF Provider, and some additional step for granting service account access.
- WIF in GHA
steps: - name: Authenticate GCP - Workload Identity Federation uses: 'google-github-actions/auth@v2' with: project_id: ${{ secrets.PROJECT_ID }} workload_identity_provider: ${{ secrets.WIF_PROVIDER_URL }} service_account: ${{ secrets.SERVICE_ACCOUNT }}
- we can see that it require no credential, but exposing security components or architecture may lead to security risk also.
- WIF in Terraform
- code/models/terraform/auth/main.tf
- Keys to enable this services are:
attribute_mapping
,attribute_condition
, andissuer_url
resource "google_iam_workload_identity_pool_provider" "main_provider" { project = var.project_id workload_identity_pool_id = google_iam_workload_identity_pool.main_pool.workload_identity_pool_id workload_identity_pool_provider_id = var.provider_id display_name = var.provider_display_name description = var.provider_description attribute_mapping = var.attribute_mapping attribute_condition = var.attribute_condition oidc { issuer_uri = var.issuer_uri } }
... variable "attribute_mapping" { type = map(any) default = { "google.subject" = "assertion.sub" "attribute.aud" = "assertion.aud" "attribute.repository_owner" = "assertion.repository_owner" } } variable "attribute_condition" { type = string default = <<EOT assertion.repository_owner == "patcha-ranat" EOT } variable "issuer_uri" { type = string default = "https://token.actions.githubusercontent.com" } ...
In the last step, we have to grant additional access to the service account which is quite hard to execute via web-based UI.
gcloud iam service-accounts add-iam-policy-binding "${SERVICE_ACC_NAME}@${PROJECT}.iam.gserviceaccount.com" \
--project=${PROJECT_ID} \
--role="roles/iam.workloadIdentityUser" \
--member="principalSet://${IAM_PRINCIPAL_ID}/attribute.repository_owner/patcha-ranat"
Note:
- For UI approach, you have to grant access via WIF pool page, on the right panel with 'CONNECTED SERVICE ACCOUNTS' tabs
IAM_PRICIPAL_ID
is retrieved from WIF pool page, also you have to replace 'pricipal' to 'principalSet'.
However, there's some constraints for WIF that's worth mentioning
- Assertion(s) in
attribute_condition
must be specified as subset ofattribute_mapping
. We don't have to specify condition for every mapped assertions, but recommendation from official GCP documentation is that we should specify 1 assertion atleast for security purpose. - Although there're example to specify multiple conditions as a string of
attribute_condition
like the below code snippet, there's a problem when we're trying to grant service account an IAM access.--member
argument don't accept multiple conditions pattern. Some of specific project example, instead useassertion.full = "assertion.A+assertion.B+..."
for fine grained controls to allow access to specific service accounts from specific events on specific GH repo.Instead forvariable "attribute_condition" { type = string default = <<EOT assertion.sub == "xxx" assertion.aud == "xxx" assertion.repository_owner == "patcha-ranat" ... EOT }
assertion.full
attribute_mapping = { "google.subject" = "assertion.sub" "attribute.aud" = "assertion.aud" "attibute.full" = "assertion.repository+assertion.+repository_owner+..." }
principalSet://${IAM_PRINCIPAL_ID}/attribute.full/${GH_REPO}${GH_REPO_OWENER}...
issuer_uri
is a static URL obtained from example and mentioned in documentation from GitHub
References:
- How to use
assertion.full
: Stack Overflow - Another example of
assertion.full
: Stack Overflow
# Error: 1
Error: google-github-actions/auth failed with: failed to generate Google Cloud federated token for //iam.googleapis.com/***: {"error":"invalid_request","error_description":"Invalid value for \"audience\". This value should be the full resource name of the Identity Provider
GCP_WORKLOAD_IDENTITY_PROVIDER
in CD workflow must be in format: "project/<project_id>/..." with uri prefix removed.
# Error: 2
denied: Unauthenticated request. Unauthenticated requests do not have permission "artifactregistry.repositories.uploadArtifacts" on resource
gcloud auth configure-docker
is required in CD workflow steps.
# Error: 3
ERROR: (gcloud.auth.docker-helper) There was a problem refreshing your current auth tokens: ('Unable to acquire impersonated credentials'...
- Granting additional iam access to service account specific for WIF is required
gcloud iam service-accounts add-iam-policy-binding ...
# Error: 4
OIDC error: 403 'Unable to acquire impersonated credentials' [principalSet mismatch with the Subject claim]
- Attribute conditions specified when first creating WIF provider are not matched with conditions that we grant to Service Account in the last step.
Finally, you can now manual trigger workflow for automated deployment (CD Workflow)
References:
- How OIDC work with cloud provider behind the scene, JWT attributes, and advance configuration: About security hardening with OpenID Connect
- Official GHA with GCP on Github Repo: Authenticate to Google Cloud from GitHub Actions - Direct Workload Identity Federation
- GHA steps for WIF, recommendation, and gcloud commands from official published article: Enabling keyless authentication from GitHub Actions - Google Cloud Platform
- WIF & Terraform Medium Example: Workload Identity Federation & GitHub Actions & Terraform - Medium
- GitHub default variables: Store information in variables
For more resource for deployment overview, I recommended this ML Deployment Chapter from Machine Learning Zoomcamp.
To test the deployed image or ML service, we can use airfow to schedule it as a job. But to use it in airflow, we can utilize GKEPodOperator
which is required K8s cluster or DockerOperator
for more lightweight approach.
Everything is explained in the image above, but I think setting up this at first-run is quite harder than it's seemed, especially when there's an operator that is not popular among the communities and not well-documented from official provider.
Keys are about:
- auto-authentication using GCP ADC requiring airflow entrypoint that's explained in 2.2.3 GCP ADC for Pipeline Code and 2.1.4 Airflow Entrypoint
- how to configure
DockerOperator
parameters, like mountingdocker.sock
or ADC and make the service to be able to communicate with cloud.
References:
docker.sock
is required to be mounted to airflow scheduler to activateDockerOperator
- docker-airflow GitHub- Simple DockerOperator parameters configuration - Stack Overflow
- Example 1 of how to use
Mount
parameter inDockerOperator
- Stack Overflow - Example 2 of how to use
Mount
parameter inDockerOperator
- Stack Overflow
From this project, we learned how to:
- Design Data Architecture, and select tools for each process to be used in the project.
- Set up Environment for Virtualization to simulate the local environment such as database, and webserver.
- Create ETL Pipelines to extract data from various sources, transform the data, and load the data to the target database.
- Utilize Cloud Services to enable Data Lake, Data Warehouse, Image Repository, and etc on cloud.
- Learn more on Advanced Cloud Authentication with low security-risk.
- Create a Dashboard to visualize the data and analyze the business.
- Develop Machine Learning Models to predict the future sales, customer segments, and inferencing customer buying behavior.
- Productionize Machine Learning Code/Pipeline to cloud environment as an image through both manual process and CI/CD Pipeline with keyless authentication.
This is the biggest personal project I've ever done so far, and I learned a lot from it. I hope it can be useful for anyone who shares the same learning path.
Thank you for your reading, happy learning.
There's relevant topic that I first intended to extend from this project, Web Scraping. However, the scope of this project is too huge to elaborate on the topic, but you can check the concept and the old written code in web-scraping folder.