diff --git a/sdk/python/featurestore_sample/notebooks/sdk_only/Synapse example - Develop feature set and register.ipynb b/sdk/python/featurestore_sample/notebooks/sdk_only/Synapse example - Develop feature set and register.ipynb new file mode 100644 index 00000000000..3afbba92263 --- /dev/null +++ b/sdk/python/featurestore_sample/notebooks/sdk_only/Synapse example - Develop feature set and register.ipynb @@ -0,0 +1,1564 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d17fea1e-8dd7-45a9-b2d3-68bfed239388", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "# Synapse Example: Develop a feature set and register with managed feature store" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "e01e5ba7-5cdd-4ee2-b5b9-9a1957e98504", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "This notebook shows how to run feature store sample notebooks in an Azure Synapse Analytics (Synapse) workspace. Most of the cells work the same way as in AzureML notebook. The only differences are:\n", + "\n", + "1. Credential setting\n", + "\n", + " In AzureML notebook, you use the `AzureMLOnBehalfOfCredential` to authenticate with AzureML service. In Synapse notebook, you first run `az login` then use the `DefaultAzureCredential` to initialize the `MLClient` and `FeatureStoreClient`.\n", + "\n", + "1. `root_dir` setting\n", + "\n", + " `root_dir` points to the mounted `featurestore_sample` folder path." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "7443ff3b-08ae-4870-98ed-5460cf34dcf5", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "### Set up\n", + "\n", + "1. Create a new Synapse workspace or use an existing one ([synapse-analytics/get-started-create-workspace](https://learn.microsoft.com/en-us/azure/synapse-analytics/get-started-create-workspace)).\n", + "1. Create a spark pool in Synapse workspace ([quickstart-create-apache-spark-pool-portal](https://learn.microsoft.com/en-us/azure/synapse-analytics/quickstart-create-apache-spark-pool-portal)).\n", + "1. Clone the [azureml-examples](https://github.com/Azure/azureml-examples) github repository and upload the `featurestore_sample` folder to the linked primary container path(`\\user\\trusted-service-user`) of linked ADLS gen2 storage in Synapse workspace. (Use Azure Storage Explorer to upload folder). \n", + "1. Apply python packages listed under `featurestore_sample\\project\\env\\conda.yml`:\n", + "\n", + " Option a. Manage the python packages as spark pool scoped ([manage-packages-from-synapse-studio-or-azure-portal](https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-manage-pool-packages#manage-packages-from-synapse-studio-or-azure-portal)).\n", + "\n", + " Option b. Manage the python packages as session scoped ([manage-session-scoped-python-packages](https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-manage-session-packages#manage-session-scoped-python-packages-through-environmentyml-file))\n", + "\n", + " \n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Start spark session\n", + "\n", + "Execute the following code cell to start the Spark session. It wil take approximately 10 minutes to install all dependencies and start the Spark session." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"start spark session\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Mount the primary container" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from notebookutils import mssparkutils\n", + "\n", + "mssparkutils.fs.mount(\n", + " \"abfss://@.dfs.core.windows.net\",\n", + " \"/\",\n", + " {\n", + " \"linkedService\": \"\"\n", + " }, ## usually, linked_gen2_account_name is \"-WorkspaceDefaultStorage\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Setup root directory for the samples" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "root_dir = mssparkutils.fs.getMountPath(\n", + " \"/user/trusted-service-user/featurestore_sample\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5b5eb4ee-e5f5-4e2e-b0ff-06ef315954d5", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### Set up credential\n", + "\n", + "Run `az login`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f09cd984-fea2-4fbe-95bb-0c2858f43a49", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "!az login" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "6d70593c-ee5c-48f3-bb38-4e4606f7ed81", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Note\n", + "Feature store vs Project workspace: You will use a feature store to reuse features across projects. You will use a project workspace to train and inference models, by leveraging features from feature stores. Many project workspaces can share and reuse the same feature store.\n", + "\n", + "## Note\n", + "In this tutorial you will be using two SDKs:\n", + "\n", + "1. Feature store CRUD SDK: You will use the same SDK, `MLClient` (package name `azure-ai-ml`), that you use with Azure ML workspace. This will be used for feature store CRUD operations (create, read, update, and delete) for feature store, feature set and feature store entities. This is because feature store is implemented as a type of workspace. \n", + "2. Feature store core SDK: This SDK (`azureml-featurestore`) is meant to be used for feature set development and consumption (you will learn more about these operations later):\n", + "- List/Get registered feature set\n", + "- Generate/resolve feature retrieval specification\n", + "- Execute feature set definition to generate Spark dataframe\n", + "- Generate training data using a point-in-time join\n", + "\n", + "For this tutorial, you do not need to install any of these explicitly, since the instructions already cover them." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "ab7c11ee-83a2-415c-a97e-b54b1c6935a0", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Step 1: Create a minimal feature store" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "c2a0175e-5420-4798-9d19-4724337e7ee2", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### Step 1a: Set feature store parameters\n", + "Set name, location and other values for the feature store." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ba2df894-a98c-42d4-ab36-a83e6bb20834", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "featurestore_name = \"\"\n", + "featurestore_location = \"\"\n", + "featurestore_subscription_id = \"\"\n", + "featurestore_resource_group_name = \"\"" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "dbb177b9-2824-4999-93ab-8049bc73d3b4", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 1b: Create the feature store" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "6695ee2e-241b-4692-869e-d748a205cf43", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1696545478258 + }, + "name": "create-fs", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.ai.ml import MLClient\n", + "from azure.ai.ml.entities import FeatureStore\n", + "from azure.identity import DefaultAzureCredential\n", + "\n", + "ml_client = MLClient(\n", + " credential=DefaultAzureCredential(),\n", + " subscription_id=featurestore_subscription_id,\n", + " resource_group_name=featurestore_resource_group_name,\n", + ")\n", + "\n", + "fs = FeatureStore(name=featurestore_name, location=featurestore_location)\n", + "# wait for feature store creation\n", + "fs_poller = ml_client.feature_stores.begin_create(fs)\n", + "print(fs_poller.result())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "dade5a57-83f3-4a6a-8700-93a9d7357fb4", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 1c: Initialize AzureML feature store core SDK client\n", + "As explained above, this is used to develop and consume features." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "427bbf82-5906-4459-83e3-5b1fbb3992dc", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1696545558268 + }, + "name": "init-fs-core-sdk", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# feature store client\n", + "from azureml.featurestore import FeatureStoreClient\n", + "\n", + "featurestore = FeatureStoreClient(\n", + " credential=DefaultAzureCredential(),\n", + " subscription_id=featurestore_subscription_id,\n", + " resource_group_name=featurestore_resource_group_name,\n", + " name=featurestore_name,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b35dda53-c1e0-4fc7-8628-d5d94a83d5c3", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Step 2: Prototype and develop a transaction rolling aggregation feature set in this notebook" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "86b37905-39cb-45af-93ba-509b6e9704da", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 2a: Explore the transactions source data\n", + "\n", + "#### Note\n", + "The sample data used in this notebook is hosted in a public accessible blob container. It can only be read in Spark via `wasbs` driver. When you create feature sets using your own source data, please host them in ADLS Gen2 account and use `abfss` driver in the data path. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "256bba16-bb18-41fa-b7cd-9fa4cc9f7db9", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1696545597487 + }, + "name": "explore-txn-src-data", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "transactions_source_data_path = \"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet\"\n", + "transactions_src_df = spark.read.parquet(transactions_source_data_path)\n", + "\n", + "transactions_src_df.head(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "74594cf9-8f12-4895-ae0e-94470fe15e19", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 2b: Develop a transactions feature set locally\n", + "\n", + "Feature set specification is a self-contained definition of feature set that can be developed and tested locally.\n", + "\n", + "Lets create the following rolling window aggregate features:\n", + "- transactions 3-day count\n", + "- transactions amount 3-day sum\n", + "- transactions amount 3-day avg\n", + "- transactions 7-day count\n", + "- transactions amount 7-day sum\n", + "- transactions amount 7-day avg\n", + "\n", + "__Action__:\n", + "- Inspect the feature transformation code file: `featurestore/featuresets/transactions/spec/transformation_code/transaction_transform.py`. You will see how is the rolling aggregation defined for the features. This is a Spark transformer.\n", + "\n", + "To understand the feature set and transformations in more detail, see [feature store concepts](https://learn.microsoft.com/azure/machine-learning/concept-what-is-managed-feature-store)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d450fdaa-575e-47fa-94e3-7fd5f168ce6c", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1696545617083 + }, + "name": "develop-txn-fset-locally", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azureml.featurestore import create_feature_set_spec\n", + "from azureml.featurestore.feature_source import ParquetFeatureSource\n", + "from azureml.featurestore.contracts import (\n", + " DateTimeOffset,\n", + " TransformationCode,\n", + " Column,\n", + " ColumnType,\n", + " TimestampColumn,\n", + ")\n", + "\n", + "\n", + "transactions_featureset_code_path = (\n", + " root_dir + \"/featurestore/featuresets/transactions/transformation_code\"\n", + ")\n", + "\n", + "transactions_featureset_spec = create_feature_set_spec(\n", + " source=ParquetFeatureSource(\n", + " path=\"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet\",\n", + " timestamp_column=TimestampColumn(name=\"timestamp\"),\n", + " source_delay=DateTimeOffset(days=0, hours=0, minutes=20),\n", + " ),\n", + " transformation_code=TransformationCode(\n", + " path=transactions_featureset_code_path,\n", + " transformer_class=\"transaction_transform.TransactionFeatureTransformer\",\n", + " ),\n", + " index_columns=[Column(name=\"accountID\", type=ColumnType.string)],\n", + " source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),\n", + " temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),\n", + " infer_schema=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d168a060-49c1-47df-b130-e460519253b0", + "showTitle": false, + "title": "" + }, + "tags": [ + "active-ipynb" + ] + }, + "outputs": [], + "source": [ + "# Generate a Spark dataframe from the feature set specification\n", + "transactions_fset_df = transactions_featureset_spec.to_spark_dataframe()\n", + "# Display few records\n", + "display(transactions_fset_df.head(5))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "7e536c38-eece-438d-ba2d-fba2c7c228a6", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 2c: Export as feature set specification\n", + "In order to register the feature set specification with the feature store, it needs to be saved in a specific format.\n", + " \n", + "Action: Please inspect the generated `transactions` FeaturesetSpec: Open this file from the file tree to see the specification: `featurestore/featuresets/accounts/spec/FeaturesetSpec.yaml`\n", + "\n", + "specification contains these important elements:\n", + "\n", + "1. `source`: Reference to a storage. In this case a parquet file in a blob storage.\n", + "2. `features`: List of features and their datatypes. If you provide transformation code, the code has to return a dataframe that maps to the features and data types.\n", + "3. `index_columns`: The join keys required to access values from the feature set\n", + "\n", + "Learn more about it in the [top level feature store entities document](https://learn.microsoft.com/azure/machine-learning/concept-top-level-entities-in-managed-feature-store) and the [feature set specification YAML reference](https://learn.microsoft.com/azure/machine-learning/reference-yaml-featureset-spec).\n", + "\n", + "The additional benefit of persisting the feature set specification is that it can be source controlled." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "76dc4367-3dc3-4a8d-af1f-c33cb1be750e", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1696545629168 + }, + "name": "dump-transactions-fs-spec", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "# Create a new folder to dump the feature set specification.\n", + "transactions_featureset_spec_folder = (\n", + " root_dir + \"/featurestore/featuresets/transactions/spec\"\n", + ")\n", + "\n", + "# Creates the given directory if it does not exist, also creating any necessary parent directories\n", + "mssparkutils.fs.mkdirs(transactions_featureset_spec_folder)\n", + "\n", + "transactions_featureset_spec.dump(transactions_featureset_spec_folder, overwrite=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d18096d9-c015-4875-9b7e-4ccf4826de9f", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Step 3: Register a feature store entity\n", + "Entity helps enforce best practice that same join key definitions are used across featuresets which uses the same logical entities. Examples of entities are account entity, customer entity etc. Entities are typically created once and reused across feature sets. For information on basics concept of feature store, see [feature store concepts](https://learn.microsoft.com/azure/machine-learning/concept-what-is-managed-feature-store)." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "382f9005-01fa-48cf-8840-9797be9363af", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 3a: Initialize the feature store CRUD client\n", + "\n", + "As explained in the beginning of this tutorial, `MLClient` is used for CRUD of assets in feature store. The below code looks up the feature store we created in an earlier step. We cannot reuse the same `ml_client` used above here because the former is scoped at the resource group level, which is a prerequisite for creation of feature store. The below one is scoped at feature store level.\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "cc7e7e5c-7f54-41ec-8b02-2de03eb07608", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1696545635654 + }, + "name": "init-fset-crud-client", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# MLClient for feature store.\n", + "fs_client = MLClient(\n", + " credential,\n", + " featurestore_subscription_id,\n", + " featurestore_resource_group_name,\n", + " featurestore_name,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "6cb7bb87-e66e-4c9d-bd89-6db8ae51b945", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 3b: Register `account` entity with the feature store\n", + "Create account entity that has join key `accountID` of `string` type. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "50858985-8f36-4757-94a7-822976aa95f0", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1696545646811 + }, + "name": "register-acct-entity", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.ai.ml.entities import DataColumn, DataColumnType, FeatureStoreEntity\n", + "\n", + "account_entity_config = FeatureStoreEntity(\n", + " name=\"account\",\n", + " version=\"1\",\n", + " index_columns=[DataColumn(name=\"accountID\", type=DataColumnType.STRING)],\n", + " stage=\"Development\",\n", + " description=\"This entity represents user account index key accountID.\",\n", + " tags={\"data_typ\": \"nonPII\"},\n", + ")\n", + "\n", + "poller = fs_client.feature_store_entities.begin_create_or_update(account_entity_config)\n", + "print(poller.result())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d1f02729-c07c-496a-91df-8eb1c4c28f9e", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Step 4: Register the transaction feature set with the feature store\n", + "You register a feature set asset with the feature store so that you can share and reuse with others. You also get managed capabilities like versioning and materialization (we will learn in this tutorial series).\n", + "\n", + "The feature set asset has reference to the feature set spec that you created earlier and additional properties like version and materialization settings." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0fa579c9-45ca-4a23-99df-6c09a01dddba", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1696545663104 + }, + "name": "register-txn-fset", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.ai.ml.entities import FeatureSetSpecification, FeatureSet\n", + "\n", + "transaction_fset_config = FeatureSet(\n", + " name=\"transactions\",\n", + " version=\"1\",\n", + " description=\"7-day and 3-day rolling aggregation of transactions featureset\",\n", + " entities=[f\"azureml:account:1\"],\n", + " stage=\"Development\",\n", + " specification=FeatureSetSpecification(path=transactions_featureset_spec_folder),\n", + " tags={\"data_type\": \"nonPII\"},\n", + ")\n", + "\n", + "poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)\n", + "print(poller.result())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5e8e27b6-7edb-4992-bcab-f4c1ce1c5301", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Explore the feature store UI\n", + "* Goto the [Azure ML global landing page](https://ml.azure.com/home).\n", + "* Click on **Feature stores** in the left navigation.\n", + "* You will see the list of feature stores that you have access to. Click on the feature store that you have created above.\n", + "\n", + "You can see the feature sets and entities that you have created.\n", + "\n", + "Note: Creating and updating feature store assets (feature sets and entities) is possible only through SDK and CLI. You can use the UI to search/browse the feature store." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "07c62bf5-f6ab-428e-b38a-9579e269267b", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### Grant \"Storage Blob Data Reader\" role on the offline store to your user identity\n", + "If feature data is materialized, then you need this role to read feature data from offline materialization store.\n", + "- Get your AAD object id from Azure portal following this instruction: https://learn.microsoft.com/en-us/partner-center/find-ids-and-domain-names#find-the-user-object-id\\\n", + "- Get information about the offline materialization store from the Feature Store **Overview** page in the Feature Store UI. The storage account subscription ID, storage account resource group name, and storage account name for offline materialization store can be found on **Offline materialization store** card. \n", + "![OFFLINE_STORE_INFO](./images/offline-store-information.png) \n", + "\n", + "To learn more about access control, see access control document in the docs.\n", + "\n", + "Execute the following code cell for role assignment. Please note that it may take some time for permissions to propagate. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a8fa5b25-11a4-4ec8-b958-5b71766a4192", + "showTitle": false, + "title": "" + }, + "name": "grant-rbac-to-user-identity", + "tags": [ + "active-ipynb" + ] + }, + "outputs": [], + "source": [ + "# This utility function is created for ease of use in the docs tutorials. It uses standard azure API's.\n", + "# You can optionally inspect it `featurestore/setup/setup_storage_uai.py`.\n", + "import sys\n", + "\n", + "sys.path.insert(0, root_dir + \"/featurestore/setup\")\n", + "from setup_storage_uai import grant_user_aad_storage_data_reader_role\n", + "\n", + "your_aad_objectid = \"\"\n", + "storage_subscription_id = \"\"\n", + "storage_resource_group_name = \"\"\n", + "storage_account_name = \"\"\n", + "\n", + "grant_user_aad_storage_data_reader_role(\n", + " credential,\n", + " your_aad_objectid,\n", + " storage_subscription_id,\n", + " storage_resource_group_name,\n", + " storage_account_name,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "161ac04c-77da-438f-b929-9470714d2a60", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Step 5: Generate a training data dataframe using the registered features" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "0412ce82-0058-4529-9689-a62a67c5a179", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 5a: Load observation data\n", + "\n", + "We start by exploring the observation data. Observation data is typically the core data used in training and inference data. This is then joined with feature data to create the full training data. Observation data is the data captured during the time of the event: in this case it has core transaction data including transaction ID, account ID, transaction amount. In this case, since it is for training, it also has the target variable appended (`is_fraud`).\n", + "\n", + "To learn more core concepts including observation data, refer to the feature store documentation." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3cdd9605-51a7-48a6-84d3-a6bbd5494fd6", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1683417449378 + }, + "name": "load-obs-data", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "observation_data_path = \"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/observation_data/train/*.parquet\"\n", + "observation_data_df = spark.read.parquet(observation_data_path)\n", + "obs_data_timestamp_column = \"timestamp\"\n", + "\n", + "display(observation_data_df)\n", + "# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5b8db741-86cd-444b-821c-eaab3a0a659a", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 5b: Get the registered feature set and list its features" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1335a3aa-9ba7-4b5f-8c6a-f5477488b4c3", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1683416499081 + }, + "name": "get-txn-fset", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# Look up the featureset by providing a name and a version.\n", + "transactions_featureset = featurestore.feature_sets.get(\"transactions\", \"1\")\n", + "# List its features.\n", + "transactions_featureset.features" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9e7f4672-1030-4ccd-83de-6478b05f080f", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1683416508419 + }, + "name": "print-txn-fset-sample-values", + "nteract": { + "transient": { + "deleting": false + } + }, + "tags": [ + "active-ipynb" + ] + }, + "outputs": [], + "source": [ + "# Print sample values.\n", + "display(transactions_featureset.to_spark_dataframe().head(5))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5521d164-b03c-4912-9061-78a30e8ecdc0", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "#### Step 5c: Select features and generate training data\n", + "In this step we will select features that we would like to be part of training data and use the feature store SDK to generate the training data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "62060093-5b8d-4c4f-ab88-f8e8385e93e2", + "showTitle": false, + "title": "" + }, + "gather": { + "logged": 1683417499373 + }, + "name": "select-features-and-gen-training-data", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azureml.featurestore import get_offline_features\n", + "\n", + "# You can select features in pythonic way.\n", + "features = [\n", + " transactions_featureset.get_feature(\"transaction_amount_7d_sum\"),\n", + " transactions_featureset.get_feature(\"transaction_amount_7d_avg\"),\n", + "]\n", + "\n", + "# You can also specify features in string form: featureset:version:feature.\n", + "more_features = [\n", + " f\"transactions:1:transaction_3d_count\",\n", + " f\"transactions:1:transaction_amount_3d_avg\",\n", + "]\n", + "\n", + "more_features = featurestore.resolve_feature_uri(more_features)\n", + "features.extend(more_features)\n", + "\n", + "# Generate training dataframe by using feature data and observation data.\n", + "training_df = get_offline_features(\n", + " features=features,\n", + " observation_data=observation_data_df,\n", + " timestamp_column=obs_data_timestamp_column,\n", + ")\n", + "\n", + "# Ignore the message that says feature set is not materialized (materialization is optional). We will enable materialization in the subsequent part of the tutorial.\n", + "display(training_df)\n", + "# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "e238c06c-dd53-452b-b240-6e4d0e9fb7e8", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "You can see how the features are appended to the training data using a point-in-time join." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "ad226e73-5716-4b17-b135-dd85dc22fc59", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Step 6: Enable offline materialization on transactions feature set\n", + "Once materialization is enabled on a feature set, you can perform backfill (this tutorial) or schedule recurrent materialization jobs (shown in a later tutorial)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "a7d1d3ab-2727-4ec1-b756-09c971250972", + "showTitle": false, + "title": "" + }, + "name": "enable-offline-mat-txns-fset", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.ai.ml.entities import (\n", + " MaterializationSettings,\n", + " MaterializationComputeResource,\n", + ")\n", + "\n", + "transactions_fset_config = fs_client._featuresets.get(name=\"transactions\", version=\"1\")\n", + "\n", + "transactions_fset_config.materialization_settings = MaterializationSettings(\n", + " offline_enabled=True,\n", + " resource=MaterializationComputeResource(instance_type=\"standard_e8s_v3\"),\n", + " spark_configuration={\n", + " \"spark.driver.cores\": 4,\n", + " \"spark.driver.memory\": \"36g\",\n", + " \"spark.executor.cores\": 4,\n", + " \"spark.executor.memory\": \"36g\",\n", + " \"spark.executor.instances\": 2,\n", + " },\n", + " schedule=None,\n", + ")\n", + "\n", + "fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)\n", + "print(fs_poller.result())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b497b908-3e1f-4e62-a71a-b836e0cfa113", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "Optionally, you can save the the above feature set asset as YAML." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ba57d130-ceaa-4e44-9838-e0ac928410e6", + "showTitle": false, + "title": "" + }, + "name": "dump-txn-fset-yaml", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "## uncomment to run\n", + "transactions_fset_config.dump(\n", + " root_dir\n", + " + \"/featurestore/featuresets/transactions/featureset_asset_offline_enabled.yaml\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b2de7469-4a73-4d58-bb17-37c4635b058a", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Step 7: Backfill data for transactions feature set\n", + "Materialization is the process of computing the feature values for a given feature window and storing this in an materialization store. Materializing the features will increase its reliability and availability. All feature queries will use the materialized values from the materialization store. In this step you perform a one-time backfill for a feature window of __18 months__.\n", + "\n", + "#### Note\n", + "How to determine the window of backfill data needed? It has to match with the window of your training data. For e.g. if you want to train with two years of data, then you will want to be able to retrieve features for the same window, so you will backfill for a two year window." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "262e682e-7816-4e98-8461-3c7407e943ee", + "showTitle": false, + "title": "" + }, + "name": "backfill-txns-fset", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "\n", + "st = datetime(2022, 1, 1, 0, 0, 0, 0)\n", + "ed = datetime(2023, 6, 30, 0, 0, 0, 0)\n", + "\n", + "poller = fs_client.feature_sets.begin_backfill(\n", + " name=\"transactions\",\n", + " version=\"1\",\n", + " feature_window_start_time=st,\n", + " feature_window_end_time=ed,\n", + " data_status=[\"None\", \"Incomplete\"],\n", + ")\n", + "print(poller.result().job_ids)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b0db8a07-d065-4225-a51e-6fbaedeb3b4a", + "showTitle": false, + "title": "" + }, + "name": "stream-mat-job-logs", + "nteract": { + "transient": { + "deleting": false + } + }, + "tags": [ + "active-ipynb" + ] + }, + "outputs": [], + "source": [ + "# Get the job URL, and stream the job logs.\n", + "fs_client.jobs.stream(poller.result().job_ids[0])" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "e51deeca-1f6d-460f-9fc2-9153739d0a57", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "Let's print sample data from the feature set. You can notice from the output information that the data was retrieved from the materilization store. `get_offline_features()` method that is used to retrieve training/inference data will also use the materialization store by default." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "af3a1d51-32c2-47ee-af4c-afedbd475254", + "showTitle": false, + "title": "" + }, + "name": "sample-txns-fset-data", + "nteract": { + "transient": { + "deleting": false + } + }, + "tags": [ + "active-ipynb" + ] + }, + "outputs": [], + "source": [ + "# Look up the feature set by providing a name and a version and display few records.\n", + "transactions_featureset = featurestore.feature_sets.get(\"transactions\", \"1\")\n", + "display(transactions_featureset.to_spark_dataframe().head(5))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "bc33839c-cfc8-425a-9131-240a920c5375", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Cleanup\n", + "\n", + "Tutorial `5. Develop a feature set with custom source` has instructions for deleting the resources." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "435a000f-9772-4036-ad0b-1f80d2dd63d9", + "showTitle": false, + "title": "" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Next steps\n", + "* Experiment and train models using features." + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "1. Develop a feature set and register with managed feature store", + "widgets": {} + }, + "celltoolbar": "Edit Metadata", + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.13" + }, + "microsoft": { + "host": { + "AzureML": { + "notebookHasBeenCompleted": true + } + }, + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}