Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CHIP-9: Support Model-based Transformations in Join & Chaining #757

Open
hzding621 opened this issue May 1, 2024 · 1 comment
Open

CHIP-9: Support Model-based Transformations in Join & Chaining #757

hzding621 opened this issue May 1, 2024 · 1 comment
Labels
CHIP Chronon Improvements Proposals - use template: https://github.com/airbnb/chronon/issues/439

Comments

@hzding621
Copy link
Collaborator

hzding621 commented May 1, 2024

CHIP-9: Support Model-based Transformations in Join & Chaining

Problem Statement

Model Inference is an important primitive form of transform function that ML practitioners use in creating feature pipelines. The most popular example is embeddings as ML features, where the output of an Embedding Model (usually a DNN model) is used as input features for a downstream model. Once created (trained), the (Embedding) Model can be treated as a special case of the general row-to-row transformation function, and can be plugged anywhere into a feature pipeline.

This CHIP will add support for model-based transformations in Chronon via extension to Join API. This, combined with existing chaining support for join, will allow Chronon users to build complex feature pipelines that include model-based transformation. Of course this will also cover both offline backfills and online serving just like regular joins.

Requirements

  • Backfills of model-based features
  • Online fetching of model-based features
  • Streaming updates of model-based features via join chaining.

Non-Requirements

  • Training of the model itself. This is assumed to be done outside of Chronon for now, however, discussion for how to be compatible with training is welcome.
  • Vector ANN index deployment or hydration. This could also be a future CHIP.

Join API Changes

We call this the Model Transform API, which extends the current Chronon join to introduce a new model_transform section that handles model inference after the generation of raw feature data. Model transform takes place after the execution of join logic completes, and acts as another round of transformation which calls into an external model inference (either batch or online) engine to retrieve the model inference output.

Screenshot 2024-05-01 at 4 04 48 PM

Model

Core to the Model Transform API is the Model definition. A Model contains all parameters required to invoke a model backend to run model inference. Note that the model backend implementation will not live in Chronon open source but in an impl-specific code base. The responsibility of Chronon here is to handle the end-to-end orchestration from raw data into final features. It also serves as the central repository for feature metadata.

  • model_backend: this refers to a specific model backend that will be in the Impl. This is similar to the name of an ExternalSourceHandler in ExternalPart API
  • model_backend_params: this is used to store any params needed by model backend to locate the model instance and know how to run inference against it.
listing_model = Model(
    inference_spec=InferenceSpec(
        model_backend="<<model-service>",
        model_backend_params={
            "model_name": "search_listing_tower",
            "model_version": "v1",
        }
    ),
    input_schema=[
        ("numeric_features", DataType.LIST(DataType.DOUBLE)),
        ("categorical_features", DataType.LIST(DataType.LONG)),
    ],
    output_schema=[
        ("listing_embeddings", DataType.LIST(DataType.DOUBLE))
    ]
)

query_model = Model(
    inference_spec=InferenceSpec(
        model_backend="<<model-service>",
        model_backend_params={
            "model_name": "search_query_tower",
            "model_version": "v1",
        }
    ),
    input_schema=[
        ("numeric_features", DataType.LIST(DataType.DOUBLE)),
        ("categorical_features", DataType.LIST(DataType.LONG)),
    ],
    output_schema=[
        ("query_embeddings", DataType.LIST(DataType.DOUBLE))
    ]
)

Model Transform

We will introduce ModelTransform as another section in the Join definition. During orchestration, this step runs after derivations and its output becomes the new join output.

ModelTransform contains the core Model definition, as well as some additional join-level parameters for mappings and formatting:

  • Output Mappings: this is an optional field that allows an additional conversion from model output to the desired column names in join. Only renaming is supported here.
  • Passthrough Fields: this is an optional field that allows the join to specify pre-model-transform fields that will be concatenated with model output in the final output of the join. If not specified, the default behavior would be to include every field from the previous phase.
import joins.search.embeddings.utils

listing_tower_join = Join(
	left=utils.listing_driver,
	right_parts=utils.listing_features_v1,
	model_transform=ModelTransform(
		model=listing_model,
		output_mappings={
			"embeddings": "listing_embeddings"
		},
		passthrough_fields=["gb1_feat1", "gb2_feat2"]
	),
	online=False,
	offline_schedule='@daily'
)

query_tower_join = Join(
	left=utils.query_driver,
	right_parts=utils.query_features,
	model_transform=ModelTransform(
		model=query_model,
		output_mappings={
			"embeddings": "query_embeddings"
		},
	passthrough_fields=["gb1_feat1", "gb2_feat2"]
	),
	online=True,
	offline_schedule='@never'
)

Model Backend APIs

Model backend will need to implement the following APIs, which Chronon will invoke during orchestration.

def registerModel(model: Model): Future[RegistrationResponse]
// Send any required metadata to the model backend and prepare it for model inference. 

def registerModelTransform(join: Join): Future[RegistrationResponse]
// Send any required metadata to the model backend and prepare it for (batch) model inference. 

def runModelBatchJob(join: Join, start_ds: String, end_ds: String): Future[JobId]
// Run a batch model inference job for a given join.

def getModelBatchJobStatus(jobId: JobId, start_ds: String, end_ds: String): Future[JobStatus]
// Get the status of a batch model inference job.

def runModelInference(join: Join, inputs: Seq[Map[String, AnyRef]]): Future[Seq[Map[String, AnyRef]]]
// Run online model inference which returns the model output directly.
// Will be used in fetcher.fetchJoin

Orchestration Topology

Metadata Registration and Validation

Screenshot 2024-05-01 at 4 09 05 PM

Join Backfill

Screenshot 2024-05-01 at 4 10 17 PM

Join Fetching

Screenshot 2024-06-01 at 11 57 45 PM

Orchestration Details

Join Level Operations

Analyzer

  • Development Steps
    • Users authors Chronon joins with model transform info in a Chronon config.
    • During dev iteration, users use run.py to submit an analyzer run which will register and validate the model transform info by calling /modelBackend/registerModel + /modelBackend/registerModelTransform
      • The validations provide early feedback to users during local iteration.

Join Backfill

  • Development Steps
    • Run through the join steps until derivations (raw features)
    • Registration
      • Call /modelBackend/registerModel + /modelBackend/registerModelTransform to register/validate metadata
    • Batch model inference
      • Call /modelBackend/runModelBatchInferenceJob to schedule a batch inference job
      • Call (periodically) /modelBackend/getModelBatchInferenceJobStatus to check upon the batch inference job
  • Production Steps
    • For each join with model transform, a new DAG will be created in addition to the current join DAG:
      • Sensor to wait for join DAG to complete
      • Registration
        • Call /modelBackend/registerModel + /modelBackend/registerModelTransform
      • Batch model inference
        • Call /modelBackend/runModelBatchInferenceJob to schedule a batch inference job
        • Call (periodically) /modelBackend/getModelBatchInferenceJobStatus to check upon the batch inference job

Fetcher

  • Development steps
    • During dev iteration, users can also test model online inference capability using fetcher cli
      • Call /modelBackend/registerModel + /modelBackend/registerModelTransform to register/validate metadata
      • Call /modelBackend/runModelInference to retrieve model outputs
  • Production steps
    • In backend services, users import AFP library that invokes fetchJoin that:
      • First fetches raw features, and then
      • Calls /modelBackend/runModelInference to retrieve model outputs
    • Note that registration is not happening due to the hot path, so we need a separate metadata upload path.

Model Metadata Upload

  • Development Steps
    • Users shouldn't need to run this. Only needed by Chronon engineers for testing.
  • Production Steps
    • We will implement a new DAG (per team level) for Model Transform metadata upload.
      • Regularly runs every 20 minutes.
      • Calls /modelBackend/registerModel + /modelBackend/registerModelTransform for all joins with model transform.
      • This DAG is somewhat similar to today’s chronon_metadata_upload but has different task for different join
      • Registration can trigger validation failure, and will email/notify users
    • This DAG is primarily needed for the online fetcher to work because we don’t want to upload to run on the hot path.

Group By Level Operations (for Chaining)

Below are related to Chaining, where the output of a Join with Model Transform is used as a JoinSource in a downstream GroupBy, which can be either a batch GroupBy or a streaming GroupBy.

Group By Upload

  • Development Steps
    • Users can run GroupBy Upload in run.py to test it locally, but usually this is not necessary.
  • Production Steps
    • In group by upload DAG:
      • GroupBy upload job reads the model output table and performs regular GB Upload operations. This is unchanged from current chaining flow.

Group By Streaming

  • Development Steps
    • Users can use Local Streaming mode to test it locally.
  • Production Steps
    • In group by streaming DAG and the scheduled streaming job:
      • Consumes kafka topic defined on the left of model join
      • Performs feature lookup
      • Call /modelBackend/runModelInference to retrieve model outputs
      • Performs group by aggregation
      • Writes output to KV store
@nikhilsimha
Copy link
Contributor

def runModelInference(join: Join, inputs: Map[String, AnyRef]): Future[Map[String, AnyRef]]

This should be instead a batch / multi method

def runModelInference(join: Join, inputs: Seq[Map[String, AnyRef]]): Future[Seq[Map[String, AnyRef]]]

@nikhilsimha nikhilsimha added the CHIP Chronon Improvements Proposals - use template: https://github.com/airbnb/chronon/issues/439 label May 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CHIP Chronon Improvements Proposals - use template: https://github.com/airbnb/chronon/issues/439
Projects
None yet
Development

No branches or pull requests

2 participants