Skip to content

Commit

Permalink
Merge branch-23.11
Browse files Browse the repository at this point in the history
  • Loading branch information
drobison00 committed Nov 15, 2023
1 parent 9ec6d7e commit 1edd192
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 87 deletions.
2 changes: 1 addition & 1 deletion examples/digital_fingerprinting/production/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ RUN source activate morpheus &&\
# notebook v7 is incompatible with jupyter_contrib_nbextensions
notebook=6 &&\
jupyter contrib nbextension install --user &&\
pip install jupyterlab_nvdashboard==0.7.0
pip install jupyterlab_nvdashboard==0.9

# Launch jupyter
CMD ["jupyter-lab", "--ip=0.0.0.0", "--no-browser", "--allow-root"]
2 changes: 1 addition & 1 deletion examples/digital_fingerprinting/production/conda_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ dependencies:
- kfp
- librdkafka
- mlflow>=2.2.1,<3
- nodejs=17.4.0
- nodejs=18.*
- nvtabular=23.06
- papermill
- s3fs>=2023.6
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import warnings
from collections import namedtuple
from datetime import datetime
from datetime import timezone

import fsspec
import mrc
Expand Down Expand Up @@ -124,6 +125,9 @@ def on_data(self, file_objects: fsspec.core.OpenFiles) -> typing.List[typing.Tup
ts = self._date_conversion_func(file_object)

# Exclude any files outside the time window
if (ts.tzinfo is None):
ts = ts.replace(tzinfo=timezone.utc)

if ((self._start_time is not None and ts < self._start_time)
or (self._end_time is not None and ts > self._end_time)):
continue
Expand Down Expand Up @@ -171,7 +175,12 @@ def on_data(self, file_objects: fsspec.core.OpenFiles) -> typing.List[typing.Tup

for _, period_df in resampled:

obj_list = fsspec.core.OpenFiles(period_df["objects"].to_list(), mode=file_objects.mode, fs=file_objects.fs)
file_list = period_df["objects"].to_list()

if (len(file_list) == 0):
continue

obj_list = fsspec.core.OpenFiles(file_list, mode=file_objects.mode, fs=file_objects.fs)

output_batches.append((obj_list, n_groups))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def on_data(self, message: MultiDFPMessage) -> MultiDFPMessage:
return output_message

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.unique_name, ops.map(self.on_data))
node = builder.make_node(self.unique_name, ops.map(self.on_data), ops.filter(lambda x: x is not None))
builder.make_edge(input_node, node)

# node.launch_options.pe_count = self._config.num_threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _build_window(self, message: DFPMessageMeta) -> MultiDFPMessage:
match = train_df[train_df["_row_hash"] == incoming_hash.iloc[0]]

if (len(match) == 0):
raise RuntimeError("Invalid rolling window")
raise RuntimeError(f"Invalid rolling window for user {user_id}")

first_row_idx = match.index[0].item()
last_row_idx = train_df[train_df["_row_hash"] == incoming_hash.iloc[-1]].index[-1].item()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def __init__(self, c: Config, model_kwargs: dict = None, epochs=30, validation_s
"scaler": 'standard', # feature scaling method
"min_cats": 1, # cut off for minority categories
"progress_bar": False,
"device": "cuda"
"device": "cuda",
"patience": -1,
}

# Update the defaults
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,42 @@ def __init__(

self._batch_size = c.pipeline_batch_size

self._filenames = filenames
self._filenames = self._expand_directories(filenames)
# Support directory expansion

self._input_count = None
self._max_concurrent = c.num_threads
self._watch = watch
self._watch_interval = watch_interval

@staticmethod
def _expand_directories(filenames: typing.List[str]) -> typing.List[str]:
"""
Expand to glob all files in any directories in the input filenames,
provided they actually exist.
ex. /path/to/dir -> /path/to/dir/*
"""
updated_list = []
for file_name in filenames:
# Skip any filenames that already contain wildcards
if '*' in file_name or '?' in file_name:
updated_list.append(file_name)
continue

# Check if the file or directory actually exists
fs_spec = fsspec.filesystem(protocol='file')
if not fs_spec.exists(file_name):
updated_list.append(file_name)
continue

if fs_spec.isdir(file_name):
updated_list.append(f"{file_name}/*")
else:
updated_list.append(file_name)

return updated_list

@property
def name(self) -> str:
"""Return the name of the stage."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,25 @@

# pylint: disable=invalid-name
iso_date_regex_pattern = (
# YYYY-MM-DD
r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})"
r"T(?P<hour>\d{1,2})(:|_|\.)(?P<minute>\d{1,2})(:|_|\.)(?P<second>\d{1,2})(?P<microsecond>\.\d{1,6})?Z")
# Start of time group (must match everything to add fractional days)
r"(?:T"
# HH
r"(?P<hour>\d{1,2})"
# : or _ or .
r"(?::|_|\.)"
# MM
r"(?P<minute>\d{1,2})"
# : or _ or .
r"(?::|_|\.)"
# SS
r"(?P<second>\d{1,2})"
# Optional microseconds (don't capture the period)
r"(?:\.(?P<microsecond>\d{0,6}))?"
# End of time group (optional)
r")?"
# Optional Zulu time
r"(?P<zulu>Z)?")

iso_date_regex = re.compile(iso_date_regex_pattern)
Loading

0 comments on commit 1edd192

Please sign in to comment.