From dafab0f23200f52cd12199a6fa5a1b0b04f778eb Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 10 Apr 2024 10:59:47 -0700 Subject: [PATCH 1/6] allow cpp code path to be used for Categorify ops --- merlin/systems/workflow/base.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/merlin/systems/workflow/base.py b/merlin/systems/workflow/base.py index 81090f60d..39fc1c3a7 100644 --- a/merlin/systems/workflow/base.py +++ b/merlin/systems/workflow/base.py @@ -66,15 +66,18 @@ def __init__(self, workflow, output_dtypes, model_config, model_device): ) # recurse over all column groups, initializing operators for inference pipeline. - # (disabled for now while we sort out whether and how we want to use C++ implementations - # of NVTabular operators for performance optimization) - # self._initialize_ops(self.workflow.output_node) + # (disabled everyting other than Categorify for now while we sort out whether + # and how we want to use C++ implementations of NVTabular operators for + # performance optimization) + self._initialize_ops(self.workflow.output_node, restrict=["Categorify"]) - def _initialize_ops(self, workflow_node, visited=None): + def _initialize_ops(self, workflow_node, visited=None, restrict=False): if visited is None: visited = set() - if workflow_node.op and hasattr(workflow_node.op, "inference_initialize"): + if workflow_node.op and hasattr(workflow_node.op, "inference_initialize") and ( + not restrict or workflow_node.op.label in restrict + ): inference_op = workflow_node.op.inference_initialize( workflow_node.selector, self.model_config ) @@ -96,7 +99,7 @@ def _initialize_ops(self, workflow_node, visited=None): for parent in workflow_node.parents_with_dependencies: if parent not in visited: visited.add(parent) - self._initialize_ops(parent, visited) + self._initialize_ops(parent, visited=visited, restrict=restrict) def run_workflow(self, input_tensors): transformable = TensorTable(input_tensors).to_df() From 88136454ed43bdb3b8f9fbd2810a4dbbd383f5da Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 23 Apr 2024 21:23:01 -0500 Subject: [PATCH 2/6] Update merlin/systems/workflow/base.py --- merlin/systems/workflow/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/merlin/systems/workflow/base.py b/merlin/systems/workflow/base.py index 39fc1c3a7..dae3064e7 100644 --- a/merlin/systems/workflow/base.py +++ b/merlin/systems/workflow/base.py @@ -71,7 +71,9 @@ def __init__(self, workflow, output_dtypes, model_config, model_device): # performance optimization) self._initialize_ops(self.workflow.output_node, restrict=["Categorify"]) - def _initialize_ops(self, workflow_node, visited=None, restrict=False): + def _initialize_ops(self, workflow_node, visited=None, restrict=None): + restrict = restrict or [] + if visited is None: visited = set() From 061310003711bd4fbb0e732672b6e0beb4d48226 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 26 Apr 2024 15:28:42 -0700 Subject: [PATCH 3/6] formatting --- merlin/systems/workflow/base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/merlin/systems/workflow/base.py b/merlin/systems/workflow/base.py index dae3064e7..54a63ef8d 100644 --- a/merlin/systems/workflow/base.py +++ b/merlin/systems/workflow/base.py @@ -77,8 +77,10 @@ def _initialize_ops(self, workflow_node, visited=None, restrict=None): if visited is None: visited = set() - if workflow_node.op and hasattr(workflow_node.op, "inference_initialize") and ( - not restrict or workflow_node.op.label in restrict + if ( + workflow_node.op + and hasattr(workflow_node.op, "inference_initialize") + and (not restrict or workflow_node.op.label in restrict) ): inference_op = workflow_node.op.inference_initialize( workflow_node.selector, self.model_config From 25b3839675ef71f37f587af09a66dc6a5c8f1e6d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 26 Apr 2024 15:30:53 -0700 Subject: [PATCH 4/6] spelling --- merlin/systems/workflow/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/systems/workflow/base.py b/merlin/systems/workflow/base.py index 54a63ef8d..4551e6685 100644 --- a/merlin/systems/workflow/base.py +++ b/merlin/systems/workflow/base.py @@ -66,7 +66,7 @@ def __init__(self, workflow, output_dtypes, model_config, model_device): ) # recurse over all column groups, initializing operators for inference pipeline. - # (disabled everyting other than Categorify for now while we sort out whether + # (disabled everything other than Categorify for now while we sort out whether # and how we want to use C++ implementations of NVTabular operators for # performance optimization) self._initialize_ops(self.workflow.output_node, restrict=["Categorify"]) From 6701251419935b96ef781e69f62b91a8ba6eed05 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 31 May 2024 15:58:06 -0700 Subject: [PATCH 5/6] require env-variable opt-in tfor cpp code path --- merlin/systems/workflow/base.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/merlin/systems/workflow/base.py b/merlin/systems/workflow/base.py index 4551e6685..80c124995 100644 --- a/merlin/systems/workflow/base.py +++ b/merlin/systems/workflow/base.py @@ -27,6 +27,7 @@ import functools import json import logging +import os from merlin.dag import ColumnSelector, DataFormats, Supports from merlin.dag.executors import LocalExecutor, _convert_format, _data_format @@ -66,10 +67,12 @@ def __init__(self, workflow, output_dtypes, model_config, model_device): ) # recurse over all column groups, initializing operators for inference pipeline. - # (disabled everything other than Categorify for now while we sort out whether + # (disabled everything other than operators that are specifically listed + # by the `NVT_CPP_OPS` environment variable while we sort out whether # and how we want to use C++ implementations of NVTabular operators for # performance optimization) - self._initialize_ops(self.workflow.output_node, restrict=["Categorify"]) + _nvt_cpp_ops = os.environ.get("NVT_CPP_OPS", "").split(",") + self._initialize_ops(self.workflow.output_node, restrict=_nvt_cpp_ops) def _initialize_ops(self, workflow_node, visited=None, restrict=None): restrict = restrict or [] From b4eecd42fa838be0a7e30e2f67cf85ed091f54b3 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 11 Jun 2024 11:18:42 -0500 Subject: [PATCH 6/6] Update merlin/systems/workflow/base.py --- merlin/systems/workflow/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/systems/workflow/base.py b/merlin/systems/workflow/base.py index 80c124995..61c8a3c3d 100644 --- a/merlin/systems/workflow/base.py +++ b/merlin/systems/workflow/base.py @@ -71,7 +71,7 @@ def __init__(self, workflow, output_dtypes, model_config, model_device): # by the `NVT_CPP_OPS` environment variable while we sort out whether # and how we want to use C++ implementations of NVTabular operators for # performance optimization) - _nvt_cpp_ops = os.environ.get("NVT_CPP_OPS", "").split(",") + _nvt_cpp_ops = os.environ.get("NVT_CPP_OPS", "Categorify").split(",") self._initialize_ops(self.workflow.output_node, restrict=_nvt_cpp_ops) def _initialize_ops(self, workflow_node, visited=None, restrict=None):