diff --git a/examples/feature_engineering/README.md b/examples/feature_engineering/README.md new file mode 100644 index 00000000..930b091d --- /dev/null +++ b/examples/feature_engineering/README.md @@ -0,0 +1,124 @@ +# Feature Engineering + +What is feature engineering? It's the process of transforming data for input to a "model". + +To make models better, it's common to perform and try a lot of "transforms". This is where Hamilton comes in. +Hamilton allows you to: +* write different transformations in a straightforward and formulaic manner +* keep them managed and versioned with computational lineage (if using something like git) +* has a great testing and documentation story + +which allows you to sanely iterate, maintain, and determine what works best for your modeling domain. + +In this examples series, we'll skip talking about the benefits of Hamilton here, and instead focus on how to use it +for feature engineering. But first, some context on what challenges you're likely to face with feature engineering +in general. + +# What is hard about feature engineering? +There are certain dimensions that make feature engineering hard: + +1. Code: Organizing and maintaining code for reuse/collaboration/discoverability. +2. Lineage: Keeping track of what data is being used for what purpose. +3. Deployment: Offline vs online vs streaming needs. + +## Code: Organizing and maintaining code for reuse/collaboration/discoverability. +> Individuals build features, but teams own them. + +Have you ever dreaded taking over someone else's code? This is a common problem with feature engineering! + +Why? The code for feature engineering is often spread out across many different files, and created by many individuals. +E.g. scripts, notebooks, libraries, etc., and written in many different ways. This makes it hard to reuse code, +collaborate, discover what code is available, and therefore maintain what is actually being used in "production" and +what is not. + +## Lineage: Keeping track of what data is being used for what purpose +With the growth of data teams, along with data governance & privacy regulations, the need for knowing and understanding what +data is being used and for what purpose is important for the business to easily answer. A "modeler" a lot of the times +is not a stakeholder in needing this visibility, they just want to build models, but these concerns are often put on +their plate to address, which slows down their ability to build and ship features and thus models. + +Not having lineage or visbility into what data is being used for what purpose can lead to a lot of problems: + - teams break data assumptions without knowing it, e.g. upstream team stops updating data used downstream. + - teams are not aware of what data is available to them, e.g. duplication of data & effort. + - teams have to spend time figuring out what data is being used for what purpose, e.g. to audit models. + - teams struggle to debug inherited feature workflows, e.g. to fix bugs or add new features. + +## Deployment: Offline vs online vs streaming needs +This is a big topic. We wont do it justice here, but let's try to give a brief overview of two main problems: + +(1) There are a lot of different deployment needs when you get something to production. For example, you might want to: + - run a batch job to generate features for a model + - hit a webservice to make predictions in realtime that needs features computed on the fly, or retrieved from a cache (e.g. feature store). + - run a streaming job to generate features for a model in realtime + - require all three or a subset of the above ways of deploying features. + +So the challenge is, how do you design your processes to take in account your deployment needs? + +(2) Implement features once or twice or thrice? To enable (1), you need to ask yourself, can we share features? or +do we need to reimplement them for every system that we want to use them in? + +With (1) and (2) in mind, you can see that there are a lot of different dimensions to consider when designing your +feature engineering processes. They have to connect with each other, and be flexible enough to support your specific +deployment needs. + +# Using Hamilton for Feature Engineering for Batch/Offline +If you fall into **only** needing to deploy features for batch jobs, then stop right there. You don't need these examples, +as they are focused on how to bridge the gap between "offline" and "online" feature engineering. You should instead +browse the other examples like `data_quality`. + +# Using Hamilton for Feature Engineering for Batch/Offline and Online/Streaming +These example scenarios here are for the people who have to deal with both batch and online feature engineering. + +We provide two examples for two common scenarios that occur if you have this need. Note, the example code in these +scenarios tries to be illustrative about how to think and frame using Hamilton. It contains minimal features so as to +not overwhelm you, and leaves out some implementation details that you would need to fill in for your specific use case, +e.g. like fitting a model using the features, or where to store aggregate feature values, etc. + +## Scenario Context +A not too uncommon task is that you need to do feature engineering in an offline (e.g. batch via airflow) +setting, as well as an online setting (e.g. synchronous request via FastAPI). What commonly +happens is that the code for features is not shared, and results in two implementations +that result in subtle bugs and hard to maintain code. + +With this example series, we show how you can use Hamilton to: + +1. write a feature once. (scenarios 1 and 2) +2. leverage that feature code anywhere that python runs. e.g. in batch and online. (scenarios 1 and 2) +3. show how to modularize components so that if you have values cached in a feature store, +you can inject those values into your feature computation needs. (scenario 2) + +The task that we're modeling here isn't that important, but if you must know, we're trying to predict the number of +hours of absence that an employee will have given some information about them; this is based off the `data_quality` +example, which is based off of the [Metaflow+Hamilton example](https://outerbounds.com/blog/developing-scalable-feature-engineering-dags/), +where Hamilton was used for the feature engineering process -- in that example only offline feature engineering was modeled. + +Assumptions we're using: +1. You have a fixed set of features that you want to compute for a model that you have determined as being useful a priori. +2. We are agnostic of the actual model -- and skip any details of that in the examples. + +Let's explain the context of the two scenarios a bit more. + +## Scenario 1: the simple case - ETL + Online API +In this scenario we assume we can get the same raw inputs at prediction time, as would be provided at training time. + +This is a straightforward process if all your feature transforms are [map operations](https://en.wikipedia.org/wiki/Map_(higher-order_function)). +If however you have some transforms that are aggregations, then you need to be careful about how you connect your offline +ETL with online. + +In this example, there are two features, `age_mean` and `age_std_dev`, that we avoid recomputing in an online setting. +Instead, we "store" the values for them when we compute features in the offline ETL, and then use those "stored" values +at prediction time to get the right feature computation to happen. + +## Scenario 2: the more complex case - request doesn't have all the raw data - ETL + Online API +In this scenario we assume we are not passed in data, but need to fetch it ourselves as part of the online API request. + +We will pretend to hit a feature store, that will provide us with the required data to compute the features for +input to the model. This example shows one way to modularize your Hamilton code so that you can swap out the "source" +of the data. To simplify the example, we assume that we can get all the input data we need from a feature store, rather +than it also coming in via the request. + +A good exercise would be to make note of the differences with this scenario (2) and scenario 1 in how they structure +the code with Hamilton. + +# What's next? +Jump into each directory and read the README, it'll explain how the example is set up and how things should work. diff --git a/examples/feature_engineering/scenario_1/FeaturesExampleScenario1.svg b/examples/feature_engineering/scenario_1/FeaturesExampleScenario1.svg new file mode 100644 index 00000000..8453294e --- /dev/null +++ b/examples/feature_engineering/scenario_1/FeaturesExampleScenario1.svg @@ -0,0 +1 @@ + diff --git a/examples/feature_engineering/scenario_1/README.md b/examples/feature_engineering/scenario_1/README.md new file mode 100644 index 00000000..d340cfb0 --- /dev/null +++ b/examples/feature_engineering/scenario_1/README.md @@ -0,0 +1,75 @@ +# Scenario 1: the simple case - ETL + Online API + +Context required to understand this scenario: +1. you have read the main README in the directory above. +2. you understand what it means to have an extract, transform, load (ETL) process to create features. It ingests raw data, +runs transformations, and then creates a training set for you to train a model on. +3. you understand the need to have an online API from where you want to serve the predictions. The assumption here is +that you can provide the same raw data to it that you would have access to in your ETL process. +4. you understand why aggregation features, like `mean()` or `std_dev()`, make sense only in an +offline setting where you have all the data. In an online setting, computing them likely does not make sense. + +# How it works + +1. You run `etl.py`, and in addition to storing the features/training a model, you store the aggregate global values +for `age_mean` and `age_std_dev`. `etl.py` uses `offline_loader.py`, `features.py`, and `named_model_feature_sets.py`. +2. You then run `fastapi_server.py`, which is the online webservice with the trained model (not shown here). `fastapi_server.py` +uses `features.py`, and `named_model_feature_sets.py`. +Note, in real-life you would need to figure out a process to inject the aggregate global values for `age_mean` and `age_std_dev` +into the feature computation process. E.g. If you're getting started, these could be hardcoded values, or stored to a file that +is loaded much like the model, or queried from a database, etc. Though you'll want +to ensure these values match whatever values the model was trained with. If you need help here, join our slack and we're happy to help you figure this out! + +Here's a mental image of how things work and how they relate to the files/modules & Hamilton: +![offline-online-image](FeaturesExampleScenario1.svg?sanitize=True) + + +Otherwise, here is a description of all the files and what they do: + +## offline_loader.py +Contains logic to load raw data. Here it's a flat file, but it could be going +to a database, etc. + +## features.py +The feature transform logic that takes raw data and transforms it into features. It contains some runtime +dataquality checks using Pandera. + +Important not, there are two aggregations features defined: `age_mean` and `age_std_dev`, that are computed on the +`age` column. These make sense to compute in an offline setting as you have all the data, but in an online setting where +you'd be performing inference, that doesn't makse sense. So for the online case, these computations be "overridden" in +`fastapi_server.py` with the values that were computed in the offline setting that you have stored (as mentioned above +and below it's up to you how to store them/sync them). The nice thing in Hamilton is that we can also "tag" these two +feature transforms with information to indicate to someone reading the code, that they should be overriden in the +online feature computation context. + +## etl.py +This script mimics what one might do to fit a model: extract data, transform into features, +and then load features somewhere or fit a model. It's pretty basic and is meant +to be illustrative. It is not complete, i.e. doesn't save, or fit a model, it just extracts and transforms data +into features to create a dataframe. + +As seen in this image of what is executed - we see the that data is pulled from a data source, and transformed into features. +![offline execution](offline_execution.dot.png) + +Note, you need to store `age_mean` and +`age_std_dev` and then somehow get those values to plug into the code in `fastapi_server.py` - we "hand-waive" this here. + +## named_model_feature_sets.py +Rather than hardcoding what features the model should have in two places, we define +it in a single place and import it where needed; this is simple if you can share the code eaisly. +However, this is something you'll have to determine how to best do in your set up. There are many ways to do this, +come ask in the slack channel if you need help. + +## fastapi_server.py +The FastAPI server that serves the predictions. It's pretty basic and is meant to +illustrate the steps of what's required to serve a prediction from a model, where +you want to use the same feature computation logic as in your ETL process. + +Note: the aggregation feature values are provided at run time and are the same +for all predictions -- how you "link" or "sync" these values to the webservice & model +is up to you; in this example we just hardcode them. + +Here is the DAG that is executed when a request is made. As you can see, no data is loaded, as we're assuming +that data comes from the API request. Note: `age_mean` and `age_std_dev` are overridden with values and would +not be executed (our visualization doesn't take into account overrides just yet). +![online execution](online_execution.dot.png) diff --git a/examples/feature_engineering/scenario_1/etl.py b/examples/feature_engineering/scenario_1/etl.py new file mode 100644 index 00000000..e9a0e077 --- /dev/null +++ b/examples/feature_engineering/scenario_1/etl.py @@ -0,0 +1,61 @@ +""" +This is part of an offline ETL that you'd likely have. +You pull data from a source, and transform it into features, and then save/fit a model with them. + +Here we ONLY use Hamilton to create the features, with comment stubs for the rest of the ETL that would normally +be here. +""" +import features +import named_model_feature_sets +import offline_loader +import pandas as pd + +from hamilton import driver + + +def create_features(source_location: str) -> pd.DataFrame: + """Extracts and transforms data to create feature set. + + Hamilton functions encode: + - pulling the data + - transforming the data into features + + Hamilton then handles building a dataframe. + + :param source_location: the location to load data from. + :return: a pandas dataframe. + """ + model_features = named_model_feature_sets.model_x_features + config = {} + dr = driver.Driver(config, offline_loader, features) + # Visualize the DAG if you need to: + # dr.display_all_functions('./offline_my_full_dag.dot', {"format": "png"}) + dr.visualize_execution( + model_features, + "./offline_execution.dot", + {"format": "png"}, + inputs={"location": source_location}, + ) + df = dr.execute( + # add age_mean and age_std_dev to the features + model_features + ["age_mean", "age_std_dev"], + inputs={"location": source_location}, + ) + return df + + +if __name__ == "__main__": + # stick in command line args here + _source_location = "../../data_quality/pandera/Absenteeism_at_work.csv" + _features_df = create_features(_source_location) + # we need to store `age_mean` and `age_std_dev` somewhere for the online side. + # exercise for the reader: where would you store them for your context? + # ideas: with the model? in a database? in a file? in a feature store? (all reasonable answers it just + # depends on your context). + _age_mean = _features_df["age_mean"].values[0] + _age_std_dev = _features_df["age_std_dev"].values[0] + print(_features_df) + # Then do something with the features_df, e.g. define functions to do the following: + # save_features(features_df[named_model_feature_sets.model_x_features], "my_model_features.csv") + # train_model(features_df[named_model_feature_sets.model_x_features]) + # etc. diff --git a/examples/feature_engineering/scenario_1/fastapi_server.py b/examples/feature_engineering/scenario_1/fastapi_server.py new file mode 100644 index 00000000..d081a4e3 --- /dev/null +++ b/examples/feature_engineering/scenario_1/fastapi_server.py @@ -0,0 +1,131 @@ +"""" +This is a simple example of a FastAPI server that uses Hamilton on the request +path to transform the data into features, and then uses a fake model to make +a prediction. + +The assumption here is that you get all the raw data passed in via the request. + +Otherwise for aggregration type features, you need to pass in a stored value +that we have mocked out with `load_invariant_feature_values`. +""" + +import fastapi +import features +import named_model_feature_sets +import pandas as pd +import pydantic + +from hamilton import base +from hamilton.experimental import h_async + +app = fastapi.FastAPI() + +# know the model schema somehow. +model_input_features = named_model_feature_sets.model_x_features + + +def load_invariant_feature_values() -> dict: + """This function would load the invariant feature values from a database or file. + :return: a dictionary of invariant feature values. + """ + return { + "age_mean": 33.0, + "age_std_dev": 12.0, + } + + +def fake_model(df: pd.DataFrame) -> pd.Series: + """Function to simulate a model""" + # do some transformation. + return df.sum() # this is nonsensical but provides a single number. + + +# you would load the model from disk or a registry -- here it's a function. +model = fake_model +# need to load the invariant features somehow. +invariant_feature_values = load_invariant_feature_values() + +# we instantiate an async driver once for the life of the app: +dr = h_async.AsyncDriver({}, features, result_builder=base.SimplePythonDataFrameGraphAdapter()) + + +class PredictRequest(pydantic.BaseModel): + """Assumption here is that all this data is available via the request that comes in.""" + + id: int + reason_for_absence: int + month_of_absence: int + day_of_the_week: int + seasons: int + transportation_expense: int + distance_from_residence_to_work: int + service_time: int + age: int + work_load_average_per_day: float + hit_target: int + disciplinary_failure: int + education: int + son: int + social_drinker: int + social_smoker: int + pet: int + weight: int + height: int + body_mass_index: int + + +@app.post("/predict") +async def predict_model_version1(request: PredictRequest) -> dict: + """Illustrates how a prediction could be made that needs to compute some features first. + In this version we go to the feature store, and then pass in what we get from the feature + store as overrides to the model. + + If you wanted to visualize execution, you could do something like: + dr.visualize_execution(model_input_features, + './online_execution.dot', + {"format": "png"}, + inputs=input_series) + + :param request: the request body. + :return: a dictionary with the prediction value. + """ + # one liner to quickly create some series from the request. + input_series = pd.DataFrame([request.dict()]).to_dict(orient="series") + # create the features -- point here is we're reusing the same code as in the training! + # with the ability to provide static values for things like `age_mean` and `age_std_dev`. + features = await dr.execute( + model_input_features, inputs=input_series, overrides=invariant_feature_values + ) + prediction = model(features) + return {"prediction": prediction.values[0]} + + +if __name__ == "__main__": + # If you run this as a script, then the app will be started on localhost:8000 + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) + + # here's a request you can cut and past into http://localhost:8000/docs + example_request_input = { + "id": 11, + "reason_for_absence": 26, + "month_of_absence": 7, + "day_of_the_week": 3, + "seasons": 2, + "transportation_expense": 1, + "distance_from_residence_to_work": 1, + "service_time": 289, + "age": 36, + "work_load_average_per_day": 13, + "hit_target": 33, + "disciplinary_failure": 239.554, + "education": 97, + "son": 0, + "social_drinker": 1, + "social_smoker": 2, + "pet": 1, + "weight": 90, + "height": 172, + "body_mass_index": 30, # remove this comma to make it valid json. + } diff --git a/examples/feature_engineering/scenario_1/features.py b/examples/feature_engineering/scenario_1/features.py new file mode 100644 index 00000000..9fc6f9a8 --- /dev/null +++ b/examples/feature_engineering/scenario_1/features.py @@ -0,0 +1,138 @@ +""" +Here are some features that are based on the absenteeism data set. +They are just supposed to be illustrative of the kind of features one might have. + +Note (1): we use check_output to warn us if the output is not what we expect; this is +used in both the offline ETL and the online webserivce. Use `check_output` to help +encode your expectations about the output of your functions and catch bugs early! + +Note (2): we can tag the `aggregation` features with whatever key value pair makes sense +for us to discern/identify that we should not compute these features in an online setting. +""" +import pandas as pd +import pandera as pa + +from hamilton.function_modifiers import check_output, extract_columns, tag + + +@tag(inject_at_inference_time="True") +@check_output(range=(20.0, 60.0), data_type=float) +def age_mean(age: pd.Series) -> float: + """Average of age""" + return age.mean() + + +age_zero_mean_schema = pa.SeriesSchema( + float, + checks=[ + pa.Check.in_range(-120.0, 120.0), + ], + nullable=False, +) + + +@check_output(schema=age_zero_mean_schema) +def age_zero_mean(age: pd.Series, age_mean: float) -> pd.Series: + """Zero mean of age""" + return age - age_mean + + +age_std_dev_schema = pa.SeriesSchema( + float, + checks=[ + pa.Check.in_range(0.0, 40.0), + ], +) + + +@tag(inject_at_inference_time="True") +@check_output(range=(0.0, 40.0), data_type=float) +def age_std_dev(age: pd.Series) -> float: + """Standard deviation of age.""" + return age.std() + + +age_zero_mean_unit_variance_schema = pa.SeriesSchema( + float, + checks=[ + pa.Check.in_range(-4.0, 4.0), + ], + nullable=False, +) + + +@check_output(schema=age_zero_mean_unit_variance_schema) +def age_zero_mean_unit_variance(age_zero_mean: pd.Series, age_std_dev: float) -> pd.Series: + """Zero mean unit variance value of age""" + return age_zero_mean / age_std_dev + + +@extract_columns("seasons_1", "seasons_2", "seasons_3", "seasons_4", fill_with=0) +def seasons_encoded(seasons: pd.Series) -> pd.DataFrame: + """One hot encodes seasons into 4 dimensions: + 1 - first season + 2 - second season + 3 - third season + 4 - fourth season + """ + return pd.get_dummies(seasons, prefix="seasons") + + +@extract_columns( + "day_of_the_week_2", + "day_of_the_week_3", + "day_of_the_week_4", + "day_of_the_week_5", + "day_of_the_week_6", + fill_with=0, +) +def day_of_week_encoded(day_of_the_week: pd.Series) -> pd.DataFrame: + """One hot encodes day of week into five dimensions -- Saturday & Sunday weren't present. + 1 - Sunday, 2 - Monday, 3 - Tuesday, 4 - Wednesday, 5 - Thursday, 6 - Friday, 7 - Saturday. + """ + return pd.get_dummies(day_of_the_week, prefix="day_of_the_week") + + +has_children_schema = pa.SeriesSchema( + int, + checks=[ + pa.Check.isin([0, 1]), + ], + nullable=False, +) + + +@check_output(schema=has_children_schema) +def has_children(son: pd.Series) -> pd.Series: + """Single variable that says whether someone has any children or not.""" + return (son > 0).astype(int) + + +has_pet_schema = pa.SeriesSchema( + int, + checks=[ + pa.Check.isin([0, 1]), + ], + nullable=False, +) + + +@check_output(schema=has_pet_schema) +def has_pet(pet: pd.Series) -> pd.Series: + """Single variable that says whether someone has any pets or not.""" + return (pet > 0).astype(int) + + +is_summer_schema = pa.SeriesSchema( + int, + checks=[ + pa.Check.isin([0, 1]), + ], + nullable=False, +) + + +@check_output(schema=is_summer_schema) +def is_summer(month_of_absence: pd.Series) -> pd.Series: + """Is it summer in Brazil? i.e. months of December, January, February.""" + return month_of_absence.isin([1, 2, 12]).astype(int) diff --git a/examples/feature_engineering/scenario_1/named_model_feature_sets.py b/examples/feature_engineering/scenario_1/named_model_feature_sets.py new file mode 100644 index 00000000..bb1be2b4 --- /dev/null +++ b/examples/feature_engineering/scenario_1/named_model_feature_sets.py @@ -0,0 +1,17 @@ +""" +This module is a light weight way to "record" feature sets that are used in a model; just define it in code! + +No need for a database! Just use code and `git` to version what is or isn't used/available. +""" + +model_x_features = [ + "is_summer", + "has_pet", + "has_children", + "day_of_the_week_2", + "day_of_the_week_3", + "day_of_the_week_4", + "day_of_the_week_5", + "day_of_the_week_6", + "age_zero_mean_unit_variance", +] diff --git a/examples/feature_engineering/scenario_1/offline_execution.dot.png b/examples/feature_engineering/scenario_1/offline_execution.dot.png new file mode 100644 index 00000000..1bea6723 Binary files /dev/null and b/examples/feature_engineering/scenario_1/offline_execution.dot.png differ diff --git a/examples/feature_engineering/scenario_1/offline_loader.py b/examples/feature_engineering/scenario_1/offline_loader.py new file mode 100644 index 00000000..c3e11c4b --- /dev/null +++ b/examples/feature_engineering/scenario_1/offline_loader.py @@ -0,0 +1,66 @@ +""" +Module that contains logic to load data for the offline ETL process. + +We use this to build our offline ETL featurization process. +""" + +from typing import List + +import pandas as pd + +from hamilton.function_modifiers import extract_columns + +# full set of available columns from the data source +data_columns = [ + "id", + "reason_for_absence", + "month_of_absence", + "day_of_the_week", + "seasons", + "transportation_expense", + "distance_from_residence_to_work", + "service_time", + "age", + "work_load_average_per_day", + "hit_target", + "disciplinary_failure", + "education", + "son", + "social_drinker", + "social_smoker", + "pet", + "weight", + "height", + "body_mass_index", + "absenteeism_time_in_hours", +] + + +def _sanitize_columns(df_columns: List[str]) -> List[str]: + """Renames columns to be valid hamilton names -- and lower cases them. + + :param df_columns: the current column names. + :return: sanitize column names that work with Hamilton + """ + return [c.strip().replace("/", "_per_").replace(" ", "_").lower() for c in df_columns] + + +@extract_columns(*data_columns) +def raw_data(location: str) -> pd.DataFrame: + """Extracts the raw data, renames the columns to be valid python variable names, and assigns an index. + :param location: the location to load from + :return: + """ + df = pd.read_csv(location, sep=";") + # rename columns to be valid hamilton names -- and lower case it + df.columns = _sanitize_columns(df.columns) + # create proper index -- ID-Month-Day - to be able to join features appropriately. + index = ( + df["id"].astype(str) + + "-" + + df["month_of_absence"].astype(str) + + "-" + + df["day_of_the_week"].astype(str) + ) + df.index = index + return df diff --git a/examples/feature_engineering/scenario_1/online_execution.dot.png b/examples/feature_engineering/scenario_1/online_execution.dot.png new file mode 100644 index 00000000..103c248d Binary files /dev/null and b/examples/feature_engineering/scenario_1/online_execution.dot.png differ diff --git a/examples/feature_engineering/scenario_1/requirements.txt b/examples/feature_engineering/scenario_1/requirements.txt new file mode 100644 index 00000000..e5ad16c1 --- /dev/null +++ b/examples/feature_engineering/scenario_1/requirements.txt @@ -0,0 +1,5 @@ +fastapi +pandas +pandera +sf-hamilton +uvicorn diff --git a/examples/feature_engineering/scenario_2/README.md b/examples/feature_engineering/scenario_2/README.md new file mode 100644 index 00000000..fb19644d --- /dev/null +++ b/examples/feature_engineering/scenario_2/README.md @@ -0,0 +1 @@ +STAY TUNED...!