Skip to content

Commit

Permalink
Enable cpp code path for Categorify ops (#389)
Browse files Browse the repository at this point in the history
* allow cpp code path to be used for Categorify ops

* Update merlin/systems/workflow/base.py

* formatting

* spelling

* require env-variable opt-in tfor cpp code path

* Update merlin/systems/workflow/base.py

---------

Co-authored-by: Oliver Holworthy <[email protected]>
Co-authored-by: Julio Perez <[email protected]>
  • Loading branch information
3 people authored Jun 11, 2024
1 parent ddb775b commit a19d311
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions merlin/systems/workflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import functools
import json
import logging
import os

from merlin.dag import ColumnSelector, DataFormats, Supports
from merlin.dag.executors import LocalExecutor, _convert_format, _data_format
Expand Down Expand Up @@ -66,15 +67,24 @@ def __init__(self, workflow, output_dtypes, model_config, model_device):
)

# recurse over all column groups, initializing operators for inference pipeline.
# (disabled for now while we sort out whether and how we want to use C++ implementations
# of NVTabular operators for performance optimization)
# self._initialize_ops(self.workflow.output_node)
# (disabled everything other than operators that are specifically listed
# by the `NVT_CPP_OPS` environment variable while we sort out whether
# and how we want to use C++ implementations of NVTabular operators for
# performance optimization)
_nvt_cpp_ops = os.environ.get("NVT_CPP_OPS", "Categorify").split(",")
self._initialize_ops(self.workflow.output_node, restrict=_nvt_cpp_ops)

def _initialize_ops(self, workflow_node, visited=None, restrict=None):
restrict = restrict or []

def _initialize_ops(self, workflow_node, visited=None):
if visited is None:
visited = set()

if workflow_node.op and hasattr(workflow_node.op, "inference_initialize"):
if (
workflow_node.op
and hasattr(workflow_node.op, "inference_initialize")
and (not restrict or workflow_node.op.label in restrict)
):
inference_op = workflow_node.op.inference_initialize(
workflow_node.selector, self.model_config
)
Expand All @@ -96,7 +106,7 @@ def _initialize_ops(self, workflow_node, visited=None):
for parent in workflow_node.parents_with_dependencies:
if parent not in visited:
visited.add(parent)
self._initialize_ops(parent, visited)
self._initialize_ops(parent, visited=visited, restrict=restrict)

def run_workflow(self, input_tensors):
transformable = TensorTable(input_tensors).to_df()
Expand Down

0 comments on commit a19d311

Please sign in to comment.