diff --git a/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-batch-inference.ipynb b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-batch-inference.ipynb new file mode 100644 index 0000000000..e40fea6025 --- /dev/null +++ b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-batch-inference.ipynb @@ -0,0 +1,716 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Introduction\n", + "This notebook demonstrates the full interface of the `forecast()` function. \n", + "\n", + "The best known and most frequent usage of `forecast` enables forecasting on test sets that immediately follows training data. \n", + "\n", + "However, in many use cases it is necessary to continue using the model for some time before retraining it. This happens especially in **high frequency forecasting** when forecasts need to be made more frequently than the model can be retrained. Examples are in Internet of Things and predictive cloud resource scaling.\n", + "\n", + "Here we show how to use the `forecast()` function when a time gap exists between training data and prediction period.\n", + "\n", + "Terminology:\n", + "* forecast origin: the last period when the target value is known\n", + "* forecast periods(s): the period(s) for which the value of the target is desired.\n", + "* lookback: how many past periods (before forecast origin) the model function depends on. The larger of number of lags and length of rolling window.\n", + "* prediction context: `lookback` periods immediately preceding the forecast origin" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "TIME_COLUMN_NAME = \"date\"\n", + "TIME_SERIES_ID_COLUMN_NAME = \"time_series_id\"\n", + "TARGET_COLUMN_NAME = \"y\"\n", + "lags = [1, 2, 3]\n", + "forecast_horizon = 6" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Batch Deployment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import mlflow\n", + "import mlflow.sklearn\n", + "import pandas as pd\n", + "\n", + "from azure.identity import DefaultAzureCredential\n", + "from azure.ai.ml import MLClient\n", + "\n", + "credential = DefaultAzureCredential()\n", + "ml_client = None\n", + "\n", + "subscription_id = \"\"\n", + "resource_group = \"\"\n", + "workspace = \"\"\n", + "\n", + "ml_client = MLClient(credential, subscription_id, resource_group, workspace)\n", + "\n", + "# Obtain the tracking URL from MLClient\n", + "MLFLOW_TRACKING_URI = ml_client.workspaces.get(\n", + " name=ml_client.workspace_name\n", + ").mlflow_tracking_uri\n", + "\n", + "print(MLFLOW_TRACKING_URI)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from mlflow.tracking.client import MlflowClient\n", + "\n", + "# Set the MLFLOW TRACKING URI\n", + "mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)\n", + "print(\"\\nCurrent tracking uri: {}\".format(mlflow.get_tracking_uri()))\n", + "\n", + "# Initialize MLFlow client\n", + "mlflow_client = MlflowClient()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# job_name = returned_job.name # If training job is in the same notebook\n", + "job_name = \"yellow_camera_1n84g0vcwp\" ## Example of providing an specific Job name/ID\n", + "\n", + "# Get the parent run\n", + "mlflow_parent_run = mlflow_client.get_run(job_name)\n", + "\n", + "# print(\"Parent Run: \")\n", + "# print(mlflow_parent_run)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get the best model's child run\n", + "best_child_run_id = mlflow_parent_run.data.tags[\"automl_best_child_run_id\"]\n", + "print(\"Found best child run id: \", best_child_run_id)\n", + "\n", + "best_run = mlflow_client.get_run(best_child_run_id)\n", + "\n", + "print(\"Best child run: \")\n", + "print(best_run)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "import datetime\n", + "from azure.ai.ml.entities import (\n", + " Environment,\n", + " BatchEndpoint,\n", + " BatchDeployment,\n", + " BatchRetrySettings,\n", + " Model,\n", + ")\n", + "from azure.ai.ml.constants import BatchDeploymentOutputAction\n", + "\n", + "model_name = \"test-batch-endpoint\"\n", + "batch_endpoint_name = \"gap-batch-\" + datetime.datetime.now().strftime(\"%m%d%H%M%f\")\n", + "\n", + "model = Model(\n", + " path=f\"azureml://jobs/{best_run.info.run_id}/outputs/artifacts/outputs/model.pkl\",\n", + " name=model_name,\n", + " description=\"Gap prediction sample best model\",\n", + ")\n", + "registered_model = ml_client.models.create_or_update(model)\n", + "\n", + "env = Environment(\n", + " name=\"automl-tabular-env\",\n", + " description=\"environment for automl inference\",\n", + " image=\"mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest\",\n", + " conda_file=\"artifact_downloads/outputs/conda_env_v_1_0_0.yml\",\n", + ")\n", + "\n", + "endpoint = BatchEndpoint(\n", + " name=batch_endpoint_name,\n", + " description=\"this is a sample batch endpoint\",\n", + ")\n", + "ml_client.begin_create_or_update(endpoint).wait()" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "from azure.core.exceptions import ResourceNotFoundError\n", + "from azure.ai.ml.entities import AmlCompute\n", + "\n", + "cluster_name = \"gap-cluster\"\n", + "\n", + "try:\n", + " # Retrieve an already attached Azure Machine Learning Compute.\n", + " compute = ml_client.compute.get(cluster_name)\n", + "except ResourceNotFoundError as e:\n", + " compute = AmlCompute(\n", + " name=cluster_name,\n", + " size=\"STANDARD_DS12_V2\",\n", + " type=\"amlcompute\",\n", + " min_instances=0,\n", + " max_instances=4,\n", + " idle_time_before_scale_down=120,\n", + " )\n", + " poller = ml_client.begin_create_or_update(compute)\n", + " poller.wait()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "batch_endpoint_name" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "output_file = \"forecast.csv\" # Where the predictions would be stored\n", + "batch_deployment = BatchDeployment(\n", + " name=\"non-mlflow-deployment\",\n", + " description=\"this is a sample non-mlflow deployment\",\n", + " endpoint_name=batch_endpoint_name,\n", + " model=registered_model,\n", + " code_path=\"./forecasting_script\",\n", + " scoring_script=\"forecasting_script.py\",\n", + " environment=env,\n", + " environment_variables={\n", + " \"TARGET_COLUMN_NAME\": TARGET_COLUMN_NAME,\n", + " },\n", + " compute=cluster_name,\n", + " instance_count=1, # 2\n", + " max_concurrency_per_instance=1, # 2\n", + " mini_batch_size=1, # 10\n", + " output_action=BatchDeploymentOutputAction.APPEND_ROW,\n", + " output_file_name=output_file,\n", + " retry_settings=BatchRetrySettings(max_retries=3, timeout=30),\n", + " logging_level=\"info\",\n", + " properties={\"include_output_header\": \"true\"},\n", + " tags={\"include_output_header\": \"true\"},\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "ml_client.begin_create_or_update(batch_deployment).wait()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "batch_endpoint_name" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Data visualization of the train and test data" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "# We stored the training and test data during training\n", + "df_train = pd.read_parquet(\"./data/training-mltable-folder/df_train.parquet\")\n", + "df_test = pd.read_parquet(\"./data/testing-mltable-folder/df_test.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_train.tail(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_test.head(2)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [ + "# The above test data follows the training data\n", + "# Store in folder for the batch endpoint to use as parquet file\n", + "df_test.to_parquet(\"./data/test_data_scenarios/df_test_scenario1.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import matplotlib.pyplot as plt\n", + "\n", + "# Concatenate the training and testing DataFrames\n", + "df_plot = pd.concat([df_train, df_test])\n", + "\n", + "# Create a figure and axis\n", + "plt.figure(figsize=(10, 6))\n", + "ax = plt.gca() # Get current axis\n", + "\n", + "# Group by both 'data_type' and 'time_series_id'\n", + "for (data_type, time_series_id), df in df_plot.groupby(\n", + " [\"data_type\", TIME_SERIES_ID_COLUMN_NAME]\n", + "):\n", + " df.plot(\n", + " x=\"date\",\n", + " y=TARGET_COLUMN_NAME,\n", + " label=f\"{data_type} - {time_series_id}\",\n", + " ax=ax,\n", + " legend=False,\n", + " )\n", + "\n", + "# Customize the plot\n", + "plt.xlabel(\"Date\")\n", + "plt.ylabel(\"Value\")\n", + "plt.title(\"Train and Test Data\")\n", + "\n", + "# Manually create the legend after plotting\n", + "plt.legend(title=\"Data Type and Time Series ID\")\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Forecasting from the trained model\n", + "\n", + "In this section we will review the forecast interface for two main scenarios: forecasting right after the training data, and the more complex interface for forecasting when there is a gap (in the time sense) between training and testing data.\n", + "\n", + "## X_train is directly followed by the X_test\n", + "Let's first consider the case when the prediction period immediately follows the training data. This is typical in scenarios where we have the time to retrain the model every time we wish to forecast. Forecasts that are made on daily and slower cadence typically fall into this category. Retraining the model every time benefits the accuracy because the most recent data is often the most informative.\n", + "\n", + "\n", + "\"Description\"\n", + "\n", + "We use X_test as a forecast request to generate the predictions." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get the test data for which we need the prediction" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [], + "source": [ + "from azure.ai.ml import Input\n", + "from azure.ai.ml.constants import AssetTypes\n", + "\n", + "my_test_data_input = Input(\n", + " type=AssetTypes.URI_FOLDER,\n", + " path=\"./data/test_data_scenarios\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "my_test_data_input" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Invoke the endpoint with the test data" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [], + "source": [ + "job = ml_client.batch_endpoints.invoke(\n", + " endpoint_name=batch_endpoint_name,\n", + " input=my_test_data_input, # Test data input\n", + " deployment_name=\"non-mlflow-deployment\", # name is required as default deployment is not set\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job_name = job.name\n", + "batch_job = ml_client.jobs.get(name=job_name)\n", + "print(batch_job.status)\n", + "# stream the job logs\n", + "ml_client.jobs.stream(name=job_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job_name, \" \", output_file" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get the predictions\n", + "download_path = \"./outputs/\"\n", + "ml_client.jobs.download(job_name, download_path=download_path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fcst_df = pd.read_csv(download_path + output_file, parse_dates=[TIME_COLUMN_NAME])\n", + "fcst_df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Forecasting away from training data\n", + "Suppose we trained a model, some time passed, and now we want to apply the model without re-training. If the model \"looks back\" -- uses previous values of the target -- then we somehow need to provide those values to the model.\n", + "\n", + "\"Description\"\n", + "\n", + "The notion of forecast origin comes into play: **the forecast origin is the last period for which we have seen the target value.** This applies per time-series, so each time-series can have a different forecast origin.\n", + "\n", + "The part of data before the forecast origin is the **prediction context**. To provide the context values the model needs when it looks back, we pass definite values in y_test (aligned with corresponding times in X_test)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Generate the same kind of test data we trained on, but now make the train set much longer, so that the test set will be in the future\n", + "from helper import get_timeseries, make_forecasting_query\n", + "\n", + "X_context, y_context, X_away, y_away = get_timeseries(\n", + " train_len=42, # train data was 30 steps long\n", + " test_len=4,\n", + " time_column_name=TIME_COLUMN_NAME,\n", + " target_column_name=TARGET_COLUMN_NAME,\n", + " time_series_id_column_name=TIME_SERIES_ID_COLUMN_NAME,\n", + " time_series_number=2,\n", + ")\n", + "\n", + "print(\"End of the data we trained on:\")\n", + "print(df_train.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].max())\n", + "\n", + "print(\"\\nStart of the data we want to predict on:\")\n", + "print(X_away.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].min())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "There is a gap of 12 hours between end of training and beginning of X_away. (It looks like 13 because all timestamps point to the start of the one hour periods.) Using only X_away will fail without adding context data for the model to consume" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [], + "source": [ + "x_gap_test = X_away.copy()\n", + "x_gap_test[\"y\"] = y_away\n", + "x_gap_test[\"data_type\"] = \"test\" # Dummy data\n", + "\n", + "x_gap_test.to_csv(\"./data/test_gap_scenario/gap_test_data.csv\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Since the length of the lookback is 3, we need to add 3 periods from the context to the request so that the model has the data it needs\n", + "\n", + "# Put the X and y back together for a while. They like each other and it makes them happy.\n", + "X_context[TARGET_COLUMN_NAME] = y_context\n", + "X_away[TARGET_COLUMN_NAME] = y_away\n", + "fulldata = pd.concat([X_context, X_away])\n", + "\n", + "# Forecast origin is the last point of data, which is one 1-hr period before test\n", + "forecast_origin = X_away[TIME_COLUMN_NAME].min() - pd.DateOffset(hours=1)\n", + "# it is indeed the last point of the context\n", + "assert forecast_origin == X_context[TIME_COLUMN_NAME].max()\n", + "print(\"Forecast origin: \" + str(forecast_origin))\n", + "\n", + "# The model uses lags and rolling windows to look back in time\n", + "n_lookback_periods = max(\n", + " lags\n", + ") # n_lookback_periods = max(max(lags), forecast_horizon) # If target_rolling_window_size is used\n", + "lookback = pd.DateOffset(hours=n_lookback_periods)\n", + "horizon = pd.DateOffset(hours=forecast_horizon)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Now make the forecast query from context. This is the main thing for predicting gap data\n", + "from helper import make_forecasting_query\n", + "\n", + "X_pred, y_pred = make_forecasting_query(\n", + " fulldata, TIME_COLUMN_NAME, TARGET_COLUMN_NAME, forecast_origin, horizon, lookback\n", + ")\n", + "\n", + "# show the forecast request aligned\n", + "X_show = X_pred.copy()\n", + "X_show[TARGET_COLUMN_NAME] = y_pred\n", + "X_show[X_show[\"time_series_id\"] == \"ts0\"]" + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "metadata": {}, + "outputs": [], + "source": [ + "X_pred[\n", + " \"data_type\"\n", + "] = \"unknown\" # Our trining had an additional column called data_type, hence, adding it" + ] + }, + { + "cell_type": "code", + "execution_count": 54, + "metadata": {}, + "outputs": [], + "source": [ + "gap_data = X_pred.copy()\n", + "gap_data[TARGET_COLUMN_NAME] = y_pred\n", + "gap_data.to_csv(\"./data/test_gap_scenario/gap_data_with_context.csv\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "y_pred" + ] + }, + { + "cell_type": "code", + "execution_count": 55, + "metadata": {}, + "outputs": [], + "source": [ + "my_test_data_gap_input = Input(\n", + " type=AssetTypes.URI_FOLDER,\n", + " path=\"./data/test_gap_scenario/\", # Path to the data folder that has the test data with gap\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 61, + "metadata": {}, + "outputs": [], + "source": [ + "gap_job = ml_client.batch_endpoints.invoke(\n", + " endpoint_name=batch_endpoint_name,\n", + " input=my_test_data_gap_input, # Test data input\n", + " deployment_name=\"non-mlflow-deployment\", # name is required as default deployment is not set\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job_name = gap_job.name\n", + "batch_job = ml_client.jobs.get(name=job_name)\n", + "print(batch_job.status)\n", + "# stream the job logs\n", + "ml_client.jobs.stream(name=job_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get the predictions\n", + "gap_download_path = \"./outputs/gap_scenario/\"\n", + "ml_client.jobs.download(job_name, download_path=gap_download_path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "gap_fcst_df = pd.read_csv(\n", + " gap_download_path + output_file, parse_dates=[TIME_COLUMN_NAME]\n", + ")\n", + "gap_fcst_df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "gap_data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# show the forecast aligned without the generated features\n", + "X_show = gap_fcst_df[gap_fcst_df[\"data_type\"] == \"test\"]\n", + "X_show" + ] + } + ], + "metadata": { + "authors": [ + { + "name": "jialiu" + } + ], + "category": "tutorial", + "compute": [ + "Remote" + ], + "datasets": [ + "None" + ], + "deployment": [ + "None" + ], + "exclude_from_index": false, + "framework": [ + "Azure ML AutoML" + ], + "friendly_name": "Forecasting away from training data", + "index_order": 3, + "kernelspec": { + "display_name": "sdkv2-test1", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.20" + }, + "microsoft": { + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "tags": [ + "Forecasting", + "Confidence Intervals" + ], + "task": "Forecasting" + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-local-inference.ipynb b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-local-inference.ipynb new file mode 100644 index 0000000000..e0d50b2737 --- /dev/null +++ b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-local-inference.ipynb @@ -0,0 +1,502 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Introduction\n", + "This notebook demonstrates the full interface of the `forecast()` function. \n", + "\n", + "The best known and most frequent usage of `forecast` enables forecasting on test sets that immediately follows training data. \n", + "\n", + "However, in many use cases it is necessary to continue using the model for some time before retraining it. This happens especially in **high frequency forecasting** when forecasts need to be made more frequently than the model can be retrained. Examples are in Internet of Things and predictive cloud resource scaling.\n", + "\n", + "Here we show how to use the `forecast()` function when a time gap exists between training data and prediction period.\n", + "\n", + "Terminology:\n", + "* forecast origin: the last period when the target value is known\n", + "* forecast periods(s): the period(s) for which the value of the target is desired.\n", + "* lookback: how many past periods (before forecast origin) the model function depends on. The larger of number of lags and length of rolling window.\n", + "* prediction context: `lookback` periods immediately preceding the forecast origin" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "1. **Model** \n", + " We will need the MLflow model, which is downloaded at the end of the training notebook. Follow any training notebook to get the model. The MLflow model is usually downloaded to the folder: `./artifact_downloads/outputs/mlflow-model`.\n", + "\n", + "2. **Environment** \n", + " We will need the environment to load the model. Please run the following commands to create the environment (the conda file is usually downloaded to: `./artifact_downloads/outputs/mlflow-model/conda.yaml`):\n", + " - `conda env create --file `\n", + " - `conda activate project_environment`\n", + "\n", + "3. **Register environment as kernel** \n", + " - Please run the following command to register the environment as a kernel: \n", + " ```bash\n", + " python -m ipykernel install --user --name project_environment --display-name \"model-inference\"\n", + " ```\n", + " - Refresh the kernel and then select the newly created kernel named `model-inference` from the kernel dropdown.\n", + " \n", + " Now we are good to run this notebook in the newly created kernel.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "TIME_COLUMN_NAME = \"date\"\n", + "TIME_SERIES_ID_COLUMN_NAME = \"time_series_id\"\n", + "TARGET_COLUMN_NAME = \"y\"\n", + "lags = [1, 2, 3]\n", + "forecast_horizon = 6" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Local inferencing from model pickle\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Please ensure that the training artifacts are downloaded. For more details refer to the training notebook\n", + "mlflow_dir = \"./artifact_downloads/outputs/mlflow-model\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import mlflow.pyfunc\n", + "import mlflow.sklearn\n", + "import pandas as pd\n", + "\n", + "fitted_model = mlflow.sklearn.load_model(mlflow_dir)\n", + "df_train = pd.read_parquet(\n", + " \"./data/training-mltable-folder/df_train.parquet\"\n", + ") # We stored the training and test data during training\n", + "df_test = pd.read_parquet(\"./data/testing-mltable-folder/df_test.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "df_train[df_train[\"time_series_id\"] == \"ts1\"].tail(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "df_test[df_test[\"time_series_id\"] == \"ts1\"].head(2)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Forecasting from the trained model\n", + "\n", + "In this section we will review the forecast interface for two main scenarios: forecasting right after the training data, and the more complex interface for forecasting when there is a gap (in the time sense) between training and testing data.\n", + "\n", + "## X_train is directly followed by the X_test\n", + "Let's first consider the case when the prediction period immediately follows the training data. This is typical in scenarios where we have the time to retrain the model every time we wish to forecast. Forecasts that are made on daily and slower cadence typically fall into this category. Retraining the model every time benefits the accuracy because the most recent data is often the most informative.\n", + "\n", + "\n", + "\"Description\"\n", + "\n", + "We use X_test as a forecast request to generate the predictions." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "X_test = df_test.copy()\n", + "y_test = X_test.pop(TARGET_COLUMN_NAME).values.astype(float)\n", + "\n", + "y_pred_no_gap, xy_nogap = fitted_model.forecast(X_test)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "### Confidence Intervals\n", + "Forecasting model may be used for the prediction of forecasting intervals by running forecast_quantiles(). This method accepts the same parameters as forecast()." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "quantiles = fitted_model.forecast_quantiles(X_test)\n", + "quantiles" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "### Distribution forecasts\n", + "Often the figure of interest is not just the point prediction, but the prediction at some quantile of the distribution. This arises when the forecast is used to control some kind of inventory, for example of grocery items or virtual machines for a cloud service. In such case, the control point is usually something like \"we want the item to be in stock and not run out 99% of the time\". This is called a \"service level\". Here is how you get quantile forecasts." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Specify which quantiles you would like\n", + "fitted_model.quantiles = [0.01, 0.5, 0.95]\n", + "\n", + "# use forecast_quantiles function, not the forecast() one\n", + "y_pred_quantiles = fitted_model.forecast_quantiles(X_test)\n", + "\n", + "# quantile forecasts returned in a Dataframe along with the time and time series id columns\n", + "y_pred_quantiles" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Forecasting away from training data\n", + "Suppose we trained a model, some time passed, and now we want to apply the model without re-training. If the model \"looks back\" -- uses previous values of the target -- then we somehow need to provide those values to the model.\n", + "\n", + "\"Description\"\n", + "\n", + "The notion of forecast origin comes into play: **the forecast origin is the last period for which we have seen the target value.** This applies per time-series, so each time-series can have a different forecast origin.\n", + "\n", + "The part of data before the forecast origin is the **prediction context**. To provide the context values the model needs when it looks back, we pass definite values in y_test (aligned with corresponding times in X_test)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# generate the same kind of test data we trained on, but now make the train set much longer, so that the test set will be in the future\n", + "from helper import get_timeseries, make_forecasting_query\n", + "\n", + "X_context, y_context, X_away, y_away = get_timeseries(\n", + " train_len=42, # train data was 30 steps long\n", + " test_len=4,\n", + " time_column_name=TIME_COLUMN_NAME,\n", + " target_column_name=TARGET_COLUMN_NAME,\n", + " time_series_id_column_name=TIME_SERIES_ID_COLUMN_NAME,\n", + " time_series_number=2,\n", + ")\n", + "\n", + "print(\"End of the data we trained on:\")\n", + "print(df_train.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].max())\n", + "\n", + "print(\"\\nStart of the data we want to predict on:\")\n", + "print(X_away.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].min())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "There is a gap of 12 hours between end of training and beginning of X_away. (It looks like 13 because all timestamps point to the start of the one hour periods.) Using only X_away will fail without adding context data for the model to consume" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "try:\n", + " y_pred_away, xy_away = fitted_model.forecast(X_away)\n", + " xy_away\n", + "except Exception as e:\n", + " print(e)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "How should we read that eror message? The forecast origin is at the last time the model saw an actual value of y (the target). That was at the end of the training data! The model is attempting to forecast from the end of training data. But the requested forecast periods are past the forecast horizon. We need to provide a define y value to establish the forecast origin.\n", + "\n", + "We will use the helper function to take the required amount of context from the data preceding the testing data. It's definition is intentionally simplified to keep the idea in the clear." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's see where the context data ends - it ends, by construction, just before the testing data starts." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "print(\n", + " X_context.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].agg(\n", + " [\"min\", \"max\", \"count\"]\n", + " )\n", + ")\n", + "print(\n", + " X_away.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].agg(\n", + " [\"min\", \"max\", \"count\"]\n", + " )\n", + ")\n", + "X_context.tail(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "How should we read that eror message? The forecast origin is at the last time the model saw an actual value of y (the target). That was at the end of the training data! The model is attempting to forecast from the end of training data. But the requested forecast periods are past the forecast horizon. We need to provide a define y value to establish the forecast origin.\n", + "\n", + "We will use this helper function to take the required amount of context from the data preceding the testing data. It's definition is intentionally simplified to keep the idea in the clear." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Since the length of the lookback is 3, we need to add 3 periods from the context to the request so that the model has the data it needs\n", + "\n", + "# Put the X and y back together for a while. They like each other and it makes them happy.\n", + "X_context[TARGET_COLUMN_NAME] = y_context\n", + "X_away[TARGET_COLUMN_NAME] = y_away\n", + "fulldata = pd.concat([X_context, X_away])\n", + "\n", + "# Forecast origin is the last point of data, which is one 1-hr period before test\n", + "forecast_origin = X_away[TIME_COLUMN_NAME].min() - pd.DateOffset(hours=1)\n", + "# it is indeed the last point of the context\n", + "assert forecast_origin == X_context[TIME_COLUMN_NAME].max()\n", + "print(\"Forecast origin: \" + str(forecast_origin))\n", + "\n", + "# The model uses lags and rolling windows to look back in time\n", + "n_lookback_periods = max(\n", + " lags\n", + ") # n_lookback_periods = max(max(lags), forecast_horizon) # If target_rolling_window_size is used\n", + "lookback = pd.DateOffset(hours=n_lookback_periods)\n", + "horizon = pd.DateOffset(hours=forecast_horizon)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# now make the forecast query from context (refer to figure)\n", + "X_pred, y_pred = make_forecasting_query(\n", + " fulldata, TIME_COLUMN_NAME, TARGET_COLUMN_NAME, forecast_origin, horizon, lookback\n", + ")\n", + "\n", + "# show the forecast request aligned\n", + "X_show = X_pred.copy()\n", + "X_show[TARGET_COLUMN_NAME] = y_pred\n", + "X_show[X_show[\"time_series_id\"] == \"ts0\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "X_pred[\n", + " \"data_type\"\n", + "] = \"unknown\" # Our trining had an additional column called data_type, hence, adding it" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Now everything should work\n", + "y_pred_away, xy_away = fitted_model.forecast(X_pred, y_pred)\n", + "\n", + "# show the forecast aligned without the generated features\n", + "X_show = xy_away.reset_index()\n", + "X_show[\n", + " [\"date\", \"time_series_id\", \"ext_predictor\", \"_automl_target_col\"]\n", + "] # prediction is in _automl_target_col" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "### Let us look at the tail of training data and the head of the test data for one grain" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "df_train[df_train[\"time_series_id\"] == \"ts1\"].tail(2)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If there is a gap between the train and the test data, and the test data uses lags/ rolling forecasts, we need to append the context data such that the test data has access to the lags\n", + "In the above case, train_data ends at 2000-01-02 05:00:00" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "X_show[X_show[\"time_series_id\"] == \"ts1\"][\n", + " [\"date\", \"time_series_id\", \"ext_predictor\", \"_automl_target_col\"]\n", + "]" + ] + } + ], + "metadata": { + "authors": [ + { + "name": "jialiu" + } + ], + "category": "tutorial", + "compute": [ + "Remote" + ], + "datasets": [ + "None" + ], + "deployment": [ + "None" + ], + "exclude_from_index": false, + "framework": [ + "Azure ML AutoML" + ], + "friendly_name": "Forecasting away from training data", + "index_order": 3, + "kernelspec": { + "display_name": "model-inference", + "language": "python", + "name": "project_environment" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.19" + }, + "microsoft": { + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "tags": [ + "Forecasting", + "Confidence Intervals" + ], + "task": "Forecasting", + "vscode": { + "interpreter": { + "hash": "6bd77c88278e012ef31757c15997a7bea8c943977c43d6909403c00ae11d43ca" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-training.ipynb b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-training.ipynb new file mode 100644 index 0000000000..ab970e0479 --- /dev/null +++ b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-training.ipynb @@ -0,0 +1,631 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Introduction\n", + "This notebook demonstrates the full interface of the `forecast()` function. \n", + "\n", + "The best known and most frequent usage of `forecast` enables forecasting on test sets that immediately follows training data. \n", + "\n", + "However, in many use cases it is necessary to continue using the model for some time before retraining it. This happens especially in **high frequency forecasting** when forecasts need to be made more frequently than the model can be retrained. Examples are in Internet of Things and predictive cloud resource scaling.\n", + "\n", + "Here we show how to use the `forecast()` function when a time gap exists between training data and prediction period.\n", + "\n", + "Terminology:\n", + "* forecast origin: the last period when the target value is known\n", + "* forecast periods(s): the period(s) for which the value of the target is desired.\n", + "* lookback: how many past periods (before forecast origin) the model function depends on. The larger of number of lags and length of rolling window.\n", + "* prediction context: `lookback` periods immediately preceding the forecast origin" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Please make sure you have followed the [configuration notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/configuration.ipynb) so that your ML workspace information is saved in the config file." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "import pandas as pd\n", + "import numpy as np\n", + "import logging\n", + "import warnings\n", + "\n", + "# Import required libraries\n", + "from azure.identity import DefaultAzureCredential\n", + "from azure.ai.ml import MLClient\n", + "\n", + "from azure.ai.ml.constants import AssetTypes, InputOutputModes\n", + "from azure.ai.ml import automl\n", + "from azure.ai.ml import Input\n", + "\n", + "# Squash warning messages for cleaner output in the notebook\n", + "warnings.showwarning = lambda *args, **kwargs: None\n", + "\n", + "np.set_printoptions(precision=4, suppress=True, linewidth=120)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "credential = DefaultAzureCredential()\n", + "ml_client = None\n", + "try:\n", + " ml_client = MLClient.from_config(credential)\n", + "except Exception as ex:\n", + " print(ex)\n", + " subscription_id = \"\"\n", + " resource_group = \"\"\n", + " workspace = \"\"\n", + "\n", + "ml_client = MLClient(credential, subscription_id, resource_group, workspace)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "workspace = ml_client.workspaces.get(name=ml_client.workspace_name)\n", + "\n", + "output = {}\n", + "output[\"Workspace\"] = ml_client.workspace_name\n", + "output[\"Subscription ID\"] = ml_client.subscription_id\n", + "output[\"Resource Group\"] = workspace.resource_group\n", + "output[\"Location\"] = workspace.location\n", + "pd.set_option(\"display.max_colwidth\", None)\n", + "outputDf = pd.DataFrame(data=output, index=[\"\"])\n", + "outputDf.T" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data\n", + "For the demonstration purposes we will generate the data artificially and use them for the forecasting." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "TIME_COLUMN_NAME = \"date\"\n", + "TIME_SERIES_ID_COLUMN_NAME = \"time_series_id\"\n", + "TARGET_COLUMN_NAME = \"y\"\n", + "lags = [1, 2, 3]\n", + "forecast_horizon = 6" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Synthetically generate the data to train the model\n", + "n_train_periods = 30\n", + "n_test_periods = forecast_horizon\n", + "\n", + "from helper import get_timeseries\n", + "\n", + "X_train, y_train, X_test, y_test = get_timeseries(\n", + " train_len=n_train_periods,\n", + " test_len=n_test_periods,\n", + " time_column_name=TIME_COLUMN_NAME,\n", + " target_column_name=TARGET_COLUMN_NAME,\n", + " time_series_id_column_name=TIME_SERIES_ID_COLUMN_NAME,\n", + " time_series_number=2,\n", + ")\n", + "print(X_train.shape, \" \", X_test.shape)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's see what the training data looks like." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Plot the example time series\n", + "import matplotlib.pyplot as plt\n", + "\n", + "whole_data = X_train.copy()\n", + "target_label = \"y\"\n", + "whole_data[target_label] = y_train\n", + "plt.figure(figsize=(10, 6))\n", + "for g in whole_data.groupby(\"time_series_id\"):\n", + " plt.plot(g[1][\"date\"].values, g[1][\"y\"].values, label=g[0])\n", + "plt.legend()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let us look at the train and test data of the synthetic data" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a copy of the X_train and X_test DataFrames and add the corresponding target values\n", + "df_train = X_train.copy()\n", + "df_train[TARGET_COLUMN_NAME] = y_train\n", + "df_test = X_test.copy()\n", + "df_test[TARGET_COLUMN_NAME] = y_test" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# For vizualisation of the time series\n", + "df_train[\"data_type\"] = \"Training\" # Add a column to label training data\n", + "df_test[\"data_type\"] = \"Testing\" # Add a column to label testing data\n", + "\n", + "# Concatenate the training and testing DataFrames\n", + "df_plot = pd.concat([df_train, df_test])\n", + "\n", + "# Create a figure and axis\n", + "plt.figure(figsize=(10, 6))\n", + "ax = plt.gca() # Get current axis\n", + "\n", + "# Group by both 'data_type' and 'time_series_id'\n", + "for (data_type, time_series_id), df in df_plot.groupby([\"data_type\", \"time_series_id\"]):\n", + " df.plot(\n", + " x=\"date\",\n", + " y=TARGET_COLUMN_NAME,\n", + " label=f\"{data_type} - {time_series_id}\",\n", + " ax=ax,\n", + " legend=False,\n", + " )\n", + "\n", + "# Customize the plot\n", + "plt.xlabel(\"Date\")\n", + "plt.ylabel(\"Value\")\n", + "plt.title(\"Train and Test Data\")\n", + "\n", + "# Manually create the legend after plotting\n", + "plt.legend(title=\"Data Type and Time Series ID\")\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import mltable\n", + "import os\n", + "\n", + "\n", + "def create_ml_table(data_frame, file_name, output_folder):\n", + " os.makedirs(output_folder, exist_ok=True)\n", + " data_path = os.path.join(output_folder, file_name)\n", + " data_frame.to_parquet(data_path, index=False)\n", + " paths = [{\"file\": data_path}]\n", + " ml_table = mltable.from_parquet_files(paths)\n", + " ml_table.save(output_folder)" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "os.makedirs(\"data\", exist_ok=True)\n", + "create_ml_table(\n", + " df_train,\n", + " \"df_train.parquet\",\n", + " \"./data/training-mltable-folder\",\n", + ")\n", + "\n", + "# Training MLTable defined locally, with local data to be uploaded\n", + "my_training_data_input = Input(\n", + " type=AssetTypes.MLTABLE, path=\"./data/training-mltable-folder\"\n", + ")\n", + "\n", + "my_training_data_input.__dict__\n", + "\n", + "# Test data\n", + "os.makedirs(\"data\", exist_ok=True)\n", + "create_ml_table(\n", + " X_test, # df_test,\n", + " \"X_test.parquet\",\n", + " \"./data/testing-mltable-folder\",\n", + ")\n", + "\n", + "create_ml_table(\n", + " df_test,\n", + " \"df_test.parquet\",\n", + " \"./data/testing-mltable-folder\",\n", + ")\n", + "\n", + "my_test_data_input = Input(\n", + " type=AssetTypes.URI_FOLDER,\n", + " path=\"./data/testing-mltable-folder\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compute" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from azure.core.exceptions import ResourceNotFoundError\n", + "from azure.ai.ml.entities import AmlCompute\n", + "\n", + "cluster_name = \"forecast-function\"\n", + "\n", + "try:\n", + " # Retrieve an already attached Azure Machine Learning Compute.\n", + " compute = ml_client.compute.get(cluster_name)\n", + " print(\"Found existing cluster, use it.\")\n", + "except ResourceNotFoundError as e:\n", + " compute = AmlCompute(\n", + " name=cluster_name,\n", + " size=\"STANDARD_DS12_V2\",\n", + " type=\"amlcompute\",\n", + " min_instances=0,\n", + " max_instances=4,\n", + " idle_time_before_scale_down=120,\n", + " )\n", + " poller = ml_client.begin_create_or_update(compute)\n", + " poller.wait()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "TARGET_COLUMN_NAME, TIME_COLUMN_NAME, TIME_SERIES_ID_COLUMN_NAME" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# target_column_name = \"demand\"\n", + "# time_column_name = \"timeStamp\"\n", + "# general job parameters\n", + "timeout_minutes = 15\n", + "trial_timeout_minutes = 5\n", + "exp_name = \"forecast-function-exp-no-target-rolling\"\n", + "# Create the AutoML forecasting job with the related factory-function.\n", + "\n", + "forecasting_job = automl.forecasting(\n", + " compute=cluster_name,\n", + " experiment_name=exp_name,\n", + " training_data=my_training_data_input,\n", + " target_column_name=TARGET_COLUMN_NAME,\n", + " primary_metric=\"NormalizedRootMeanSquaredError\",\n", + " n_cross_validations=3,\n", + ")\n", + "\n", + "# Limits are all optional\n", + "forecasting_job.set_limits(\n", + " timeout_minutes=timeout_minutes,\n", + " trial_timeout_minutes=trial_timeout_minutes,\n", + " enable_early_termination=True,\n", + ")\n", + "\n", + "# Specialized properties for Time Series Forecasting training\n", + "forecasting_job.set_forecast_settings(\n", + " time_column_name=TIME_COLUMN_NAME,\n", + " forecast_horizon=forecast_horizon,\n", + " # target_rolling_window_size=forecast_horizon,\n", + " time_series_id_column_names=[TIME_SERIES_ID_COLUMN_NAME],\n", + " target_lags=lags,\n", + " frequency=\"H\",\n", + " cv_step_size=3,\n", + ")\n", + "\n", + "# Training properties are optional\n", + "forecasting_job.set_training(blocked_training_algorithms=[\"ExtremeRandomTrees\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Submit training job\n", + "returned_job = ml_client.jobs.create_or_update(\n", + " forecasting_job\n", + ") # submit the job to the backend\n", + "\n", + "print(f\"Created job: {returned_job}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Wait until AutoML training runs are finished\n", + "ml_client.jobs.stream(returned_job.name)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Retrieve the Best Trial" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import mlflow\n", + "\n", + "MLFLOW_TRACKING_URI = ml_client.workspaces.get(\n", + " name=ml_client.workspace_name\n", + ").mlflow_tracking_uri\n", + "print(MLFLOW_TRACKING_URI)\n", + "\n", + "# Set the MLFLOW TRACKING URI\n", + "mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)\n", + "print(\"\\nCurrent tracking uri: {}\".format(mlflow.get_tracking_uri()))" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from mlflow.tracking.client import MlflowClient\n", + "\n", + "# Initialize MLFlow client\n", + "mlflow_client = MlflowClient()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# job_name = returned_job.name\n", + "\n", + "# Example if providing an specific Job name/ID\n", + "job_name = \"yellow_camera_1n84g0vcwp\"\n", + "\n", + "# Get the parent run\n", + "mlflow_parent_run = mlflow_client.get_run(job_name)\n", + "\n", + "print(\"Parent Run: \")\n", + "print(mlflow_parent_run)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Get the best model's child run\n", + "best_child_run_id = mlflow_parent_run.data.tags[\"automl_best_child_run_id\"]\n", + "print(\"Found best child run id: \", best_child_run_id)\n", + "\n", + "best_run = mlflow_client.get_run(best_child_run_id)\n", + "\n", + "print(\"Best child run: \")\n", + "print(best_run)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Print parent run tags. 'automl_best_child_run_id' tag should be there.\n", + "print(mlflow_parent_run.data.tags)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "pd.DataFrame(best_run.data.metrics, index=[0]).T" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run the model selection and training process. Validation errors and current status will be shown when setting `show_output=True` and the execution will be synchronous." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "## Artifact Download" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create local folder\n", + "import os\n", + "\n", + "local_dir = \"./artifact_downloads\"\n", + "if not os.path.exists(local_dir):\n", + " os.mkdir(local_dir)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Download run's artifacts/outputs\n", + "local_path = mlflow_client.download_artifacts(\n", + " best_run.info.run_id, \"outputs\", local_dir\n", + ")\n", + "print(\"Artifacts downloaded in: {}\".format(local_path))\n", + "print(\"Artifacts: {}\".format(os.listdir(local_path)))" + ] + } + ], + "metadata": { + "authors": [ + { + "name": "jialiu" + } + ], + "category": "tutorial", + "compute": [ + "Remote" + ], + "datasets": [ + "None" + ], + "deployment": [ + "None" + ], + "exclude_from_index": false, + "framework": [ + "Azure ML AutoML" + ], + "friendly_name": "Forecasting away from training data", + "index_order": 3, + "kernelspec": { + "display_name": "sdkv2-test1", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.20" + }, + "microsoft": { + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "tags": [ + "Forecasting", + "Confidence Intervals" + ], + "task": "Forecasting" + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/forecasting_script/forecasting_script.py b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/forecasting_script/forecasting_script.py new file mode 100644 index 0000000000..513be30b7d --- /dev/null +++ b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/forecasting_script/forecasting_script.py @@ -0,0 +1,67 @@ +""" +This is the script that is executed on the compute instance. It relies +on the model.pkl file which is uploaded along with this script to the +compute instance. +""" + +import os + +import pandas as pd + +from azureml.core import Dataset, Run +import joblib +from pandas.tseries.frequencies import to_offset + + +def init(): + global target_column_name + global fitted_model + + target_column_name = os.environ["TARGET_COLUMN_NAME"] + # AZUREML_MODEL_DIR is an environment variable created during deployment + # It is the path to the model folder (./azureml-models) + # Please provide your model's folder name if there's one + model_path = os.path.join(os.environ["AZUREML_MODEL_DIR"], "model.pkl") + fitted_model = joblib.load(model_path) + + +def run(mini_batch): + print(f"run method start: {__file__}, run({mini_batch})") + resultList = [] + for test in mini_batch: + if os.path.splitext(test)[-1] == ".parquet": + X_test = pd.read_parquet(test) + elif os.path.splitext(test)[-1] == ".csv": + X_test = pd.read_csv(test, parse_dates=[fitted_model.time_column_name]) + else: + continue # Skip if it's neither a Parquet nor CSV file + + y_test = X_test.pop(target_column_name).values + + # We have default quantiles values set as below(95th percentile) + quantiles = [0.025, 0.5, 0.975] + predicted_column_name = "predicted" + PI = "prediction_interval" + fitted_model.quantiles = quantiles + pred_quantiles = fitted_model.forecast_quantiles( + X_test, ignore_data_errors=True + ) + pred_quantiles[PI] = pred_quantiles[[min(quantiles), max(quantiles)]].apply( + lambda x: "[{}, {}]".format(x[0], x[1]), axis=1 + ) + X_test[target_column_name] = y_test + X_test[PI] = pred_quantiles[PI].values + X_test[predicted_column_name] = pred_quantiles[0.5].values + # drop rows where prediction or actuals are nan + # happens because of missing actuals + # or at edges of time due to lags/rolling windows + clean = X_test[ + X_test[[target_column_name, predicted_column_name]].notnull().all(axis=1) + ] + print( + f"The predictions have {clean.shape[0]} rows and {clean.shape[1]} columns." + ) + + resultList.append(clean) + + return pd.concat(resultList, sort=False, ignore_index=True) diff --git a/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/forecasting_script/parallel_run_step.settings.json b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/forecasting_script/parallel_run_step.settings.json new file mode 100644 index 0000000000..8be91e5cb2 --- /dev/null +++ b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/forecasting_script/parallel_run_step.settings.json @@ -0,0 +1 @@ +{"append_row": {"pandas.DataFrame.to_csv": {"sep": ","}}} \ No newline at end of file diff --git a/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/helper.py b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/helper.py new file mode 100644 index 0000000000..5420088a50 --- /dev/null +++ b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/helper.py @@ -0,0 +1,119 @@ +# Generate synthetic data + +import pandas as pd +import numpy as np + + +def get_timeseries( + train_len: int, + test_len: int, + time_column_name: str, + target_column_name: str, + time_series_id_column_name: str, + time_series_number: int = 1, + freq: str = "H", +): + """ + Return the time series of designed length. + + :param train_len: The length of training data (one series). + :type train_len: int + :param test_len: The length of testing data (one series). + :type test_len: int + :param time_column_name: The desired name of a time column. + :type time_column_name: str + :param time_series_number: The number of time series in the data set. + :type time_series_number: int + :param freq: The frequency string representing pandas offset. + see https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html + :type freq: str + :returns: the tuple of train and test data sets. + :rtype: tuple + + """ + data_train = [] # type: List[pd.DataFrame] + data_test = [] # type: List[pd.DataFrame] + data_length = train_len + test_len + for i in range(time_series_number): + X = pd.DataFrame( + { + time_column_name: pd.date_range( + start="2000-01-01", periods=data_length, freq=freq + ), + target_column_name: np.arange(data_length).astype(float) + + np.random.rand(data_length) + + i * 5, + "ext_predictor": np.asarray(range(42, 42 + data_length)), + time_series_id_column_name: np.repeat("ts{}".format(i), data_length), + } + ) + data_train.append(X[:train_len]) + data_test.append(X[train_len:]) + X_train = pd.concat(data_train) + y_train = X_train.pop(target_column_name).values + X_test = pd.concat(data_test) + y_test = X_test.pop(target_column_name).values + return X_train, y_train, X_test, y_test + + +def make_forecasting_query( + fulldata, time_column_name, target_column_name, forecast_origin, horizon, lookback +): + + """ + This function will take the full dataset, and create the query + to predict all values of the time series from the `forecast_origin` + forward for the next `horizon` horizons. Context from previous + `lookback` periods will be included. + + + + fulldata: pandas.DataFrame a time series dataset. Needs to contain X and y. + time_column_name: string which column (must be in fulldata) is the time axis + target_column_name: string which column (must be in fulldata) is to be forecast + forecast_origin: datetime type the last time we (pretend to) have target values + horizon: timedelta how far forward, in time units (not periods) + lookback: timedelta how far back does the model look + + Example: + + + ``` + + forecast_origin = pd.to_datetime("2012-09-01") + pd.DateOffset(days=5) # forecast 5 days after end of training + print(forecast_origin) + + X_query, y_query = make_forecasting_query(data, + forecast_origin = forecast_origin, + horizon = pd.DateOffset(days=7), # 7 days into the future + lookback = pd.DateOffset(days=1), # model has lag 1 period (day) + ) + + ``` + """ + + X_past = fulldata[ + (fulldata[time_column_name] > forecast_origin - lookback) + & (fulldata[time_column_name] <= forecast_origin) + ] + + X_future = fulldata[ + (fulldata[time_column_name] > forecast_origin) + & (fulldata[time_column_name] <= forecast_origin + horizon) + ] + + y_past = X_past.pop(target_column_name).values.astype(float) + y_future = X_future.pop(target_column_name).values.astype(float) + + # Now take y_future and turn it into question marks + y_query = y_future.copy().astype(float) # because sometimes life hands you an int + y_query.fill(np.nan) + + print("X_past is " + str(X_past.shape) + " - shaped") + print("X_future is " + str(X_future.shape) + " - shaped") + print("y_past is " + str(y_past.shape) + " - shaped") + print("y_query is " + str(y_query.shape) + " - shaped") + + X_pred = pd.concat([X_past, X_future]) + y_pred = np.concatenate([y_past, y_query]) + return X_pred, y_pred diff --git a/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/images/forecast_function_at_train.png b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/images/forecast_function_at_train.png new file mode 100644 index 0000000000..37d8ffddca Binary files /dev/null and b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/images/forecast_function_at_train.png differ diff --git a/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/images/forecast_function_away_from_train.png b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/images/forecast_function_away_from_train.png new file mode 100644 index 0000000000..7b7e203746 Binary files /dev/null and b/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/images/forecast_function_away_from_train.png differ