You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
CHIP-9: Support Model-based Transformations in Join & Chaining
Problem Statement
Model Inference is an important primitive form of transform function that ML practitioners use in creating feature pipelines. The most popular example is embeddings as ML features, where the output of an Embedding Model (usually a DNN model) is used as input features for a downstream model. Once created (trained), the (Embedding) Model can be treated as a special case of the general row-to-row transformation function, and can be plugged anywhere into a feature pipeline.
This CHIP will add support for model-based transformations in Chronon via extension to Join API. This, combined with existing chaining support for join, will allow Chronon users to build complex feature pipelines that include model-based transformation. Of course this will also cover both offline backfills and online serving just like regular joins.
Requirements
Backfills of model-based features
Online fetching of model-based features
Streaming updates of model-based features via join chaining.
Non-Requirements
Training of the model itself. This is assumed to be done outside of Chronon for now, however, discussion for how to be compatible with training is welcome.
Vector ANN index deployment or hydration. This could also be a future CHIP.
Join API Changes
We call this the Model Transform API, which extends the current Chronon join to introduce a new model_transform section that handles model inference after the generation of raw feature data. Model transform takes place after the execution of join logic completes, and acts as another round of transformation which calls into an external model inference (either batch or online) engine to retrieve the model inference output.
Model
Core to the Model Transform API is the Model definition. A Model contains all parameters required to invoke a model backend to run model inference. Note that the model backend implementation will not live in Chronon open source but in an impl-specific code base. The responsibility of Chronon here is to handle the end-to-end orchestration from raw data into final features. It also serves as the central repository for feature metadata.
model_backend: this refers to a specific model backend that will be in the Impl. This is similar to the name of an ExternalSourceHandler in ExternalPart API
model_backend_params: this is used to store any params needed by model backend to locate the model instance and know how to run inference against it.
We will introduce ModelTransform as another section in the Join definition. During orchestration, this step runs after derivations and its output becomes the new join output.
ModelTransform contains the core Model definition, as well as some additional join-level parameters for mappings and formatting:
Output Mappings: this is an optional field that allows an additional conversion from model output to the desired column names in join. Only renaming is supported here.
Passthrough Fields: this is an optional field that allows the join to specify pre-model-transform fields that will be concatenated with model output in the final output of the join. If not specified, the default behavior would be to include every field from the previous phase.
Model backend will need to implement the following APIs, which Chronon will invoke during orchestration.
defregisterModel(model: Model):Future[RegistrationResponse]
// Send any required metadata to the model backend and prepare it for model inference. defregisterModelTransform(join: Join):Future[RegistrationResponse]
// Send any required metadata to the model backend and prepare it for (batch) model inference. defrunModelBatchJob(join: Join, start_ds: String, end_ds: String):Future[JobId]
// Run a batch model inference job for a given join.defgetModelBatchJobStatus(jobId: JobId, start_ds: String, end_ds: String):Future[JobStatus]
// Get the status of a batch model inference job.defrunModelInference(join: Join, inputs: Seq[Map[String, AnyRef]]):Future[Seq[Map[String, AnyRef]]]
// Run online model inference which returns the model output directly.// Will be used in fetcher.fetchJoin
Orchestration Topology
Metadata Registration and Validation
Join Backfill
Join Fetching
Orchestration Details
Join Level Operations
Analyzer
Development Steps
Users authors Chronon joins with model transform info in a Chronon config.
During dev iteration, users use run.py to submit an analyzer run which will register and validate the model transform info by calling /modelBackend/registerModel + /modelBackend/registerModelTransform
The validations provide early feedback to users during local iteration.
Join Backfill
Development Steps
Run through the join steps until derivations (raw features)
Registration
Call /modelBackend/registerModel + /modelBackend/registerModelTransform to register/validate metadata
Batch model inference
Call /modelBackend/runModelBatchInferenceJob to schedule a batch inference job
Call (periodically) /modelBackend/getModelBatchInferenceJobStatus to check upon the batch inference job
Production Steps
For each join with model transform, a new DAG will be created in addition to the current join DAG:
Call /modelBackend/runModelBatchInferenceJob to schedule a batch inference job
Call (periodically) /modelBackend/getModelBatchInferenceJobStatus to check upon the batch inference job
Fetcher
Development steps
During dev iteration, users can also test model online inference capability using fetcher cli
Call /modelBackend/registerModel + /modelBackend/registerModelTransform to register/validate metadata
Call /modelBackend/runModelInference to retrieve model outputs
Production steps
In backend services, users import AFP library that invokes fetchJoin that:
First fetches raw features, and then
Calls /modelBackend/runModelInference to retrieve model outputs
Note that registration is not happening due to the hot path, so we need a separate metadata upload path.
Model Metadata Upload
Development Steps
Users shouldn't need to run this. Only needed by Chronon engineers for testing.
Production Steps
We will implement a new DAG (per team level) for Model Transform metadata upload.
Regularly runs every 20 minutes.
Calls /modelBackend/registerModel + /modelBackend/registerModelTransform for all joins with model transform.
This DAG is somewhat similar to today’s chronon_metadata_upload but has different task for different join
Registration can trigger validation failure, and will email/notify users
This DAG is primarily needed for the online fetcher to work because we don’t want to upload to run on the hot path.
Group By Level Operations (for Chaining)
Below are related to Chaining, where the output of a Join with Model Transform is used as a JoinSource in a downstream GroupBy, which can be either a batch GroupBy or a streaming GroupBy.
Group By Upload
Development Steps
Users can run GroupBy Upload in run.py to test it locally, but usually this is not necessary.
Production Steps
In group by upload DAG:
GroupBy upload job reads the model output table and performs regular GB Upload operations. This is unchanged from current chaining flow.
Group By Streaming
Development Steps
Users can use Local Streaming mode to test it locally.
Production Steps
In group by streaming DAG and the scheduled streaming job:
Consumes kafka topic defined on the left of model join
Performs feature lookup
Call /modelBackend/runModelInference to retrieve model outputs
Performs group by aggregation
Writes output to KV store
The text was updated successfully, but these errors were encountered:
CHIP-9: Support Model-based Transformations in Join & Chaining
Problem Statement
Model Inference is an important primitive form of transform function that ML practitioners use in creating feature pipelines. The most popular example is embeddings as ML features, where the output of an Embedding Model (usually a DNN model) is used as input features for a downstream model. Once created (trained), the (Embedding) Model can be treated as a special case of the general row-to-row transformation function, and can be plugged anywhere into a feature pipeline.
This CHIP will add support for model-based transformations in Chronon via extension to Join API. This, combined with existing chaining support for join, will allow Chronon users to build complex feature pipelines that include model-based transformation. Of course this will also cover both offline backfills and online serving just like regular joins.
Requirements
Non-Requirements
Join API Changes
We call this the Model Transform API, which extends the current Chronon join to introduce a new model_transform section that handles model inference after the generation of raw feature data. Model transform takes place after the execution of join logic completes, and acts as another round of transformation which calls into an external model inference (either batch or online) engine to retrieve the model inference output.
Model
Core to the Model Transform API is the
Model
definition. AModel
contains all parameters required to invoke a model backend to run model inference. Note that the model backend implementation will not live in Chronon open source but in an impl-specific code base. The responsibility of Chronon here is to handle the end-to-end orchestration from raw data into final features. It also serves as the central repository for feature metadata.model_backend
: this refers to a specific model backend that will be in the Impl. This is similar to the name of anExternalSourceHandler
inExternalPart
APImodel_backend_params
: this is used to store any params needed by model backend to locate the model instance and know how to run inference against it.Model Transform
We will introduce
ModelTransform
as another section in the Join definition. During orchestration, this step runs after derivations and its output becomes the new join output.ModelTransform
contains the coreModel
definition, as well as some additional join-level parameters for mappings and formatting:Model Backend APIs
Model backend will need to implement the following APIs, which Chronon will invoke during orchestration.
Orchestration Topology
Metadata Registration and Validation
Join Backfill
Join Fetching
Orchestration Details
Join Level Operations
Analyzer
Join Backfill
Fetcher
fetchJoin
that:Model Metadata Upload
Group By Level Operations (for Chaining)
Below are related to Chaining, where the output of a Join with Model Transform is used as a JoinSource in a downstream GroupBy, which can be either a batch GroupBy or a streaming GroupBy.
Group By Upload
Group By Streaming
The text was updated successfully, but these errors were encountered: