Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

week2 updates #2

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ uv pip install -r pyproject.toml --all-extras
uv lock
```

Install src package with `uv pip install -e .`
Install src package locally with `uv pip install -e .`

Install src package on cluster in notebook with `pip install dbfs:/Volumes/main/default/file_exchange/nico/power_consumption-0.0.1-py3-none-any.whl`
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does not install it in the cluster that way, just in your notebook env.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mvechtomova I know. But when I run it with databricks connect I need it locally and not in my cluster right? Because the code gets executed locally except for the spark code.


Example of uploading package to the volume:

Expand All @@ -34,8 +36,3 @@ databricks auth login --host HOST
uv build
databricks fs cp dist\power_consumption-0.0.1-py3-none-any.whl dbfs:/Volumes/main/default/file_exchange/nico
```

`uv pip install -e .`

`pip install dbfs:/Volumes/main/default/file_exchange/nico/power_consumption-0.0.1-py3-none-any.whl`

6 changes: 6 additions & 0 deletions mlruns/0/meta.yaml

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe good idea to put it in gitignore, it is bug from feature engineering package that causes creation of this folder

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
artifact_location: file:///C:/Users/grs/PycharmProjects/marvelous-databricks-course-NiGroUni/mlruns/0
creation_time: 1729865378885
experiment_id: '0'
last_update_time: 1729865378885
lifecycle_stage: active
name: Default
18 changes: 18 additions & 0 deletions model_version.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"_name": "sandbox.sb_adan.power_consumption_model_pyfunc",
"_version": "2",
"_creation_time": 1729865386204,
"_last_updated_timestamp": 1729865387313,
"_description": "",
"_user_id": "[email protected]",
"_current_stage": null,
"_source": "dbfs:/databricks/mlflow-tracking/3164043204193380/5db722f4413c4561a613ac03f8d92f59/artifacts/pyfunc-power-consumption-model",
"_run_id": "5db722f4413c4561a613ac03f8d92f59",
"_run_link": null,
"_status": "READY",
"_status_message": "",
"_tags": {
"git_sha": "bla"
},
"_aliases": []
}
22 changes: 22 additions & 0 deletions notebooks/week2/01.prepare_dataset_n.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Databricks notebook source
from power_consumption.data_processor import DataProcessor
from power_consumption.config import ProjectConfig
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# COMMAND ----------

# path stuff is annoying, in vs code it seems like everything gets executed from repo main level
try:
config = ProjectConfig.from_yaml(config_path="project_config.yml")
except Exception:
config = ProjectConfig.from_yaml(config_path="../../project_config.yml")
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved

# COMMAND ----------
# Preprocess data
data_processor = DataProcessor(config=config)
data_processor.preprocess()
train_set, test_set = data_processor.split_data()
data_processor.save_to_catalog(train_set=train_set, test_set=test_set, spark=spark)
print("DONE")
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved
171 changes: 171 additions & 0 deletions notebooks/week2/02_04_train_log_custom_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Databricks notebook source
import mlflow
import numpy as np
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved
import pandas as pd
from pyspark.sql import SparkSession
from mlflow.models import infer_signature
from power_consumption.config import ProjectConfig
import json
from mlflow import MlflowClient
from mlflow.utils.environment import _mlflow_conda_env
from power_consumption.utils import adjust_predictions

from pyspark.sql import SparkSession
from sklearn.pipeline import Pipeline
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import mlflow
from mlflow.models import infer_signature

NiGroUni marked this conversation as resolved.
Show resolved Hide resolved
mlflow.set_registry_uri("databricks-uc")
mlflow.set_tracking_uri("databricks")
client = MlflowClient()

# COMMAND ----------

try:
config = ProjectConfig.from_yaml(config_path="project_config.yml")
except Exception:
config = ProjectConfig.from_yaml(config_path="../../project_config.yml")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this logics? does not matter that much when we move into DABs, just curious

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had issues with the path, depending on how I run the script. With this it is executable from the root path but also from the notebook path


# Extract configuration details
num_features = config.num_features
target = config.target
parameters = config.parameters
catalog_name = config.catalog_name
schema_name = config.schema_name
mlflow_experiment_name = config.mlflow_experiment_name

spark = SparkSession.builder.getOrCreate()

# COMMAND ----------
# Load training and testing sets from Databricks tables
train_set_spark = spark.table(f"{catalog_name}.{schema_name}.train_set_nico")
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set_nico").toPandas()
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set_nico").toPandas()

X_train = train_set[num_features]
y_train = train_set[target]

X_test = test_set[num_features]
y_test = test_set[target]

# COMMAND ----------

# Create the pipeline with preprocessing and the LightGBM regressor
pipeline = Pipeline(steps=[
('regressor', LGBMRegressor(**parameters))
])
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved


# COMMAND ----------
mlflow.set_experiment(experiment_name=mlflow_experiment_name)
git_sha = "bla"
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved

# Start an MLflow run to track the training process
with mlflow.start_run(
tags={"git_sha": f"{git_sha}",
"branch": "week2"},
) as run:
run_id = run.info.run_id

NiGroUni marked this conversation as resolved.
Show resolved Hide resolved
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)

# Evaluate the model performance
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"Mean Squared Error: {mse}")
print(f"Mean Absolute Error: {mae}")
print(f"R2 Score: {r2}")

# Log parameters, metrics, and the model to MLflow
mlflow.log_param("model_type", "LightGBM with preprocessing")
mlflow.log_params(parameters)
mlflow.log_metric("mse", mse)
mlflow.log_metric("mae", mae)
mlflow.log_metric("r2_score", r2)
signature = infer_signature(model_input=X_train, model_output=y_pred)

NiGroUni marked this conversation as resolved.
Show resolved Hide resolved
dataset = mlflow.data.from_spark(
train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set",
version="0")
mlflow.log_input(dataset, context="training")

# mlflow.sklearn.log_model(
# sk_model=pipeline,
# artifact_path="lightgbm-pipeline-model",
# signature=signature
# )

# COMMAND ----------

class PowerConsumptionModelWrapper(mlflow.pyfunc.PythonModel):

def __init__(self, model):
self.model = model

def predict(self, context, model_input):
if isinstance(model_input, pd.DataFrame):
predictions = self.model.predict(model_input)
predictions = {"Prediction": adjust_predictions(
predictions[0])}
return predictions
else:
raise ValueError("Input must be a pandas DataFrame.")

NiGroUni marked this conversation as resolved.
Show resolved Hide resolved


# COMMAND ----------

wrapped_model = PowerConsumptionModelWrapper(PowerConsumptionModelWrapper) # we pass the loaded model to the wrapper
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved



# COMMAND ----------


with mlflow.start_run(tags={"branch": "week2",
"git_sha": f"{git_sha}"}) as run:

run_id = run.info.run_id
mlflow.pyfunc.log_model(
python_model=wrapped_model,
artifact_path="pyfunc-power-consumption-model",
signature=infer_signature(model_input=[], model_output=[])
)
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved

# COMMAND ----------
loaded_model = mlflow.pyfunc.load_model(f'runs:/{run_id}/pyfunc-power-consumption-model')
loaded_model.unwrap_python_model()

# COMMAND ----------
model_name = f"{catalog_name}.{schema_name}.power_consumption_model_pyfunc"

model_version = mlflow.register_model(
model_uri=f'runs:/{run_id}/pyfunc-power-consumption-model',
name=model_name,
tags={"git_sha": f"{git_sha}"})
# COMMAND ----------

with open("model_version.json", "w") as json_file:
json.dump(model_version.__dict__, json_file, indent=4)

# COMMAND ----------
model_version_alias = "the_best_model"
client.set_registered_model_alias(model_name, model_version_alias, "1")
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved

model_uri = f"models:/{model_name}@{model_version_alias}"
model = mlflow.pyfunc.load_model(model_uri)

# COMMAND ----------
client.get_model_version_by_alias(model_name, model_version_alias)
# COMMAND ----------
model
NiGroUni marked this conversation as resolved.
Show resolved Hide resolved

# COMMAND ----------

print("DONE")

Loading
Loading