Skip to content

Commit

Permalink
Merge branch 'branch-24.10' of https://github.com/nv-morpheus/Morpheus
Browse files Browse the repository at this point in the history
…into david-doca-batched-tables-p2
  • Loading branch information
dagardner-nv committed Aug 1, 2024
2 parents 8372267 + 5e0d920 commit 3915494
Show file tree
Hide file tree
Showing 32 changed files with 80 additions and 2,348 deletions.
1 change: 0 additions & 1 deletion ci/conda/recipes/morpheus/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ outputs:
- mrc
- networkx=2.8.8
- numpydoc =1.5.*
- nvtabular =23.08.00
- pydantic
- pluggy =1.3.*
- python
Expand Down
1 change: 0 additions & 1 deletion conda/environments/all_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ dependencies:
- nodejs=18.*
- numexpr
- numpydoc=1.5
- nvtabular=23.08.00
- onnx=1.15
- openai=1.13
- papermill=2.4.0
Expand Down
1 change: 0 additions & 1 deletion conda/environments/dev_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ dependencies:
- nlohmann_json=3.11
- nodejs=18.*
- numpydoc=1.5
- nvtabular=23.08.00
- pip
- pkg-config=0.29
- pluggy=1.3
Expand Down
1 change: 0 additions & 1 deletion conda/environments/examples_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ dependencies:
- nodejs=18.*
- numexpr
- numpydoc=1.5
- nvtabular=23.08.00
- onnx=1.15
- openai=1.13
- papermill=2.4.0
Expand Down
1 change: 0 additions & 1 deletion conda/environments/runtime_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ dependencies:
- mrc=24.10
- networkx=2.8.8
- numpydoc=1.5
- nvtabular=23.08.00
- pip
- pluggy=1.3
- pydantic
Expand Down
1 change: 0 additions & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ dependencies:
- mrc=24.10
- networkx=2.8.8
- numpydoc=1.5
- nvtabular=23.08.00
- pydantic
# - python ##
- python-confluent-kafka>=1.9.2,<1.10.0a0
Expand Down
33 changes: 26 additions & 7 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
# [conda_env_dev] -> [development] -> [development_pydbg]
# \
# [build_docs]
# \
# [git_clone]
#
# base: Contains all system packages common across all environments
# conda_env: Create morpheus conda environment and set it as the new base
Expand All @@ -38,6 +40,8 @@
# development_pydbg: Development + debug build of cpython and various GDB
# debugging macros.
# build_docs: Build the Morpheus documentation
# git_clone: Clone the Morpheus repository to ensure we do not unintentionally
# copy local build artifacts
# ========================================================================= #

# Args used in FROM commands must come first
Expand Down Expand Up @@ -283,21 +287,36 @@ RUN --mount=type=bind,from=conda_bld_morpheus,source=/opt/conda/conda-bld,target
/opt/conda/bin/conda env update --solver=libmamba -n morpheus --file \
conda/environments/runtime_cuda-${CUDA_MAJOR_VER}${CUDA_MINOR_VER}_arch-x86_64.yaml

# ============ Stage: git_clone ============
# Perform a clone of the git repository this ensures that when we copy files from the source repository, we aren't
# unintentionally including build artifacts or other files that shouldn't be in the final image
FROM conda_env_dev as git_clone

ARG MORPHEUS_ROOT_HOST

# Source the morpheus env to pick up the git-lfs package
RUN --mount=type=bind,source=${MORPHEUS_ROOT_HOST},target=/opt/host_repo \
source activate morpheus &&\
git clone file:///opt/host_repo /tmp/morpheus_repo &&\
cd /tmp/morpheus_repo &&\
git lfs install &&\
/tmp/morpheus_repo/scripts/fetch_data.py fetch datasets examples

# ============ Stage: runtime ============
# Setup container for runtime environment
FROM runtime_conda_create as runtime

ARG MORPHEUS_ROOT_HOST

# Only copy specific files/folders over that are necessary for runtime
COPY "${MORPHEUS_ROOT_HOST}/conda/environments/*.yaml" "./conda/environments/"
COPY "${MORPHEUS_ROOT_HOST}/docker" "./docker"
COPY --from=git_clone "/tmp/morpheus_repo/conda/environments/*.yaml" "./conda/environments/"
COPY --from=git_clone "/tmp/morpheus_repo/docker" "./docker"
COPY --from=build_docs "/workspace/build/docs/html" "./docs"
COPY "${MORPHEUS_ROOT_HOST}/examples" "./examples"
COPY "${MORPHEUS_ROOT_HOST}/models/datasets" "./models/datasets"
COPY "${MORPHEUS_ROOT_HOST}/scripts" "./scripts"
COPY "${MORPHEUS_ROOT_HOST}/*.md" "./"
COPY "${MORPHEUS_ROOT_HOST}/LICENSE" "./"
COPY --from=git_clone "/tmp/morpheus_repo/examples" "./examples"
COPY --from=git_clone "/tmp/morpheus_repo/models/datasets" "./models/datasets"
COPY --from=git_clone "/tmp/morpheus_repo/scripts" "./scripts"
COPY --from=git_clone "/tmp/morpheus_repo/*.md" "./"
COPY --from=git_clone "/tmp/morpheus_repo/LICENSE" "./"

RUN /opt/conda/bin/conda clean -afy && \
# Ensure the conda-bld directory is indexed even if empty
Expand Down
3 changes: 0 additions & 3 deletions docker/build_container_release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,5 @@ export DOCKER_TARGET=${DOCKER_TARGET:-"runtime"}

popd &> /dev/null

# Fetch data
"${SCRIPT_DIR}/../scripts/fetch_data.py" fetch docs examples datasets

# Call the general build script
${SCRIPT_DIR}/build_container.sh
1 change: 0 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@
"langchain_core",
"merlin",
"morpheus.cli.commands", # Dont document the CLI in Sphinx
"nvtabular",
"pandas",
"pydantic",
"pymilvus",
Expand Down
1 change: 0 additions & 1 deletion examples/digital_fingerprinting/production/conda_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dependencies:
- librdkafka
- mlflow>=2.10.0,<3
- nodejs=18.*
- nvtabular=23.06
- papermill
- s3fs>=2023.6

Expand Down
10 changes: 0 additions & 10 deletions morpheus/utils/column_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,13 @@
import logging
import re
import typing
import warnings
from datetime import datetime
from functools import partial

import pandas as pd

import cudf

if (typing.TYPE_CHECKING):
with warnings.catch_warnings():
# Ignore warning regarding tensorflow not being installed
warnings.filterwarnings("ignore", message=".*No module named 'tensorflow'", category=UserWarning)
import nvtabular as nvt

logger = logging.getLogger(f"morpheus.{__name__}")

DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00'
Expand Down Expand Up @@ -749,7 +742,6 @@ class DataFrameInputSchema:
input_columns: typing.Dict[str, str] = dataclasses.field(init=False, repr=False)
output_columns: typing.List[tuple[str, str]] = dataclasses.field(init=False, repr=False)

nvt_workflow: "nvt.Workflow" = dataclasses.field(init=False, repr=False)
prep_dataframe: typing.Callable[[pd.DataFrame], typing.List[str]] = dataclasses.field(init=False, repr=False)

def __post_init__(self):
Expand Down Expand Up @@ -796,5 +788,3 @@ def __post_init__(self):
input_columns=self.input_columns,
json_cols=self.json_columns,
preserve_re=self.preserve_columns)

self.nvt_workflow = None
43 changes: 16 additions & 27 deletions morpheus/utils/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
"""

import logging
import multiprocessing as mp
import os
import threading
import typing
import warnings
from enum import Enum

import fsspec
import pandas as pd
from merlin.core.utils import Distributed

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,7 +66,6 @@ def __init__(self,
download_method: typing.Union[DownloadMethods, str] = DownloadMethods.DASK_THREAD,
dask_heartbeat_interval: str = "30s"):

self._merlin_distributed = None
self._dask_heartbeat_interval = dask_heartbeat_interval

download_method = os.environ.get("MORPHEUS_FILE_DOWNLOAD_TYPE", download_method)
Expand Down Expand Up @@ -99,20 +95,19 @@ def get_dask_cluster(self):
Returns
-------
dask_cuda.LocalCUDACluster
dask.distributed.LocalCluster
"""

with Downloader._mutex:
if Downloader._dask_cluster is None:
import dask_cuda.utils
import dask
import dask.distributed

logger.debug("Creating dask cluster...")

n_workers = dask_cuda.utils.get_n_gpus()
threads_per_worker = mp.cpu_count() // n_workers

Downloader._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers,
threads_per_worker=threads_per_worker)
Downloader._dask_cluster = dask.distributed.LocalCluster(start=True,
processes=self.download_method
!= "dask_thread")

logger.debug("Creating dask cluster... Done. Dashboard: %s", Downloader._dask_cluster.dashboard_link)

Expand All @@ -127,24 +122,18 @@ def get_dask_client(self):
dask.distributed.Client
"""
import dask.distributed
return dask.distributed.Client(self.get_dask_cluster())

# Up the heartbeat interval which can get violated with long download times
dask.config.set({"distributed.client.heartbeat": self._dask_heartbeat_interval})
def close(self):
"""Close the dask cluster if it exists."""
if (self._dask_cluster is not None):
logger.debug("Stopping dask cluster...")

if (self._merlin_distributed is None):
with warnings.catch_warnings():
# Merlin.Distributed will warn if a client already exists, the client in question is the one created
# and are explicitly passing to it in the constructor.
warnings.filterwarnings("ignore",
message="Existing Dask-client object detected in the current context.*",
category=UserWarning)
self._merlin_distributed = Distributed(client=dask.distributed.Client(self.get_dask_cluster()))
self._dask_cluster.close()

return self._merlin_distributed
self._dask_cluster = None

def close(self):
"""Cluster management is handled by Merlin.Distributed"""
pass
logger.debug("Stopping dask cluster... Done.")

def download(self,
download_buckets: fsspec.core.OpenFiles,
Expand All @@ -169,8 +158,8 @@ def download(self,
if (self._download_method.startswith("dask")):
# Create the client each time to ensure all connections to the cluster are closed (they can time out)
with self.get_dask_client() as dist:
dfs = dist.client.map(download_fn, download_buckets)
dfs = dist.client.gather(dfs)
dfs = dist.map(download_fn, download_buckets)
dfs = dist.gather(dfs)

else:
# Simply loop
Expand Down
13 changes: 0 additions & 13 deletions morpheus/utils/nvt/__init__.py

This file was deleted.

Loading

0 comments on commit 3915494

Please sign in to comment.