Skip to content

Commit

Permalink
Merge branch 'david_conda_ci_pkg' into david-rebase-ci-images
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Jan 23, 2024
2 parents fc1bef0 + 7e811fd commit ee0649c
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 224 deletions.
73 changes: 0 additions & 73 deletions cmake/package_search/Finducx.cmake

This file was deleted.

1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ dependencies:
- libgrpc>=1.49
- librdkafka=1.9.2
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- lxml=4.9.1
- mlflow>=2.2.1,<3
- mrc=24.03
- networkx>=2.8
Expand Down
78 changes: 33 additions & 45 deletions morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -102,31 +102,33 @@ def __init__(self,
run_indefinitely = any(RSSController.is_url(f) for f in self._feed_input)

self._run_indefinitely = run_indefinitely
self._enable_cache = enable_cache

self._session = None
if enable_cache:
self._session = requests_cache.CachedSession(os.path.join(cache_dir, "RSSController.sqlite"),
backend="sqlite")
else:
self._session = requests.session()

self._session.headers.update({
"User-Agent":
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
})

self._feed_stats_dict = {
input:
url:
FeedStats(failure_count=0, success_count=0, last_failure=-1, last_success=-1, last_try_result="Unknown")
for input in self._feed_input
for url in self._feed_input
}

@property
def run_indefinitely(self):
"""Property that determines to run the source indefinitely"""
return self._run_indefinitely

@property
def session_exist(self) -> bool:
"""Property that indicates the existence of a session."""
return bool(self._session)

def get_feed_stats(self, feed_url: str) -> FeedStats:
"""
Get feed input stats.
Get feed url stats.
Parameters
----------
Expand All @@ -141,30 +143,20 @@ def get_feed_stats(self, feed_url: str) -> FeedStats:
Raises
------
ValueError
If the feed URL is not found in the feed input provided to the constructor.
If the feed URL is not found in the feed url provided to the constructor.
"""
if feed_url not in self._feed_stats_dict:
raise ValueError("The feed URL is not part of the feed input provided to the constructor.")
raise ValueError("The feed URL is not part of the feed url provided to the constructor.")

return self._feed_stats_dict[feed_url]

def _get_response_text(self, url: str) -> str:
if self.session_exist:
response = self._session.get(url)
else:
response = requests.get(url, timeout=self._request_timeout)

return response.text

def _read_file_content(self, file_path: str) -> str:
with open(file_path, 'r', encoding="utf-8") as file:
return file.read()

def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict":

feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input)
def _try_parse_feed_with_beautiful_soup(self, feed_input: str) -> "feedparser.FeedParserDict":

soup = BeautifulSoup(feed_input, 'xml')
soup = BeautifulSoup(feed_input, 'lxml')

# Verify whether the given feed has 'item' or 'entry' tags.
if soup.find('item'):
Expand Down Expand Up @@ -205,32 +197,28 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict":

fallback = False
cache_hit = False
is_url_with_session = is_url and self.session_exist

if is_url_with_session:
response = self._session.get(url)
cache_hit = response.from_cache
if is_url:
response = self._session.get(url, timeout=self._request_timeout)
feed_input = response.text
if self._enable_cache:
cache_hit = response.from_cache
else:
feed_input = url

feed = feedparser.parse(feed_input)

if feed["bozo"]:
cache_hit = False

if is_url_with_session:
fallback = True
logger.info("Failed to parse feed: %s. Trying to parse using feedparser directly.", url)
feed = feedparser.parse(url)

if feed["bozo"]:
try:
logger.info("Failed to parse feed: %s, %s. Try parsing feed manually", url, feed['bozo_exception'])
feed = self._try_parse_feed_with_beautiful_soup(url, is_url)
except Exception:
logger.error("Failed to parse the feed manually: %s", url)
raise
fallback = True
try:
if not is_url:
# Read file content
feed_input = self._read_file_content(feed_input)
# Parse feed content with beautifulsoup
feed = self._try_parse_feed_with_beautiful_soup(feed_input)
except Exception:
logger.error("Failed to parse the feed manually: %s", url)
raise

logger.debug("Parsed feed: %s. Cache hit: %s. Fallback: %s", url, cache_hit, fallback)

Expand Down Expand Up @@ -312,17 +300,17 @@ def fetch_dataframes(self):
@classmethod
def is_url(cls, feed_input: str) -> bool:
"""
Check if the provided input is a valid URL.
Check if the provided url is a valid URL.
Parameters
----------
feed_input : str
The input string to be checked.
The url string to be checked.
Returns
-------
bool
True if the input is a valid URL, False otherwise.
True if the url is a valid URL, False otherwise.
"""
try:
parsed_url = urlparse(feed_input)
Expand Down
81 changes: 49 additions & 32 deletions morpheus/models/dfencoder/autoencoder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -233,6 +233,9 @@ def get_scaler(self, name):
}
return scalers[name]

def get_feature_count(self):
return len(self.numeric_fts) + len(self.binary_fts) + len(self.categorical_fts)

def _init_numeric(self, df=None):
"""Initializes the numerical features of the model by either using preset numerical scaler parameters
or by using the input data.
Expand Down Expand Up @@ -626,8 +629,10 @@ def preprocess_data(
return preprocessed_data

def compute_loss(self, num, bin, cat, target_df, should_log=True, _id=False):

num_target, bin_target, codes = self.compute_targets(target_df)
return self.compute_loss_from_targets(

mse, bce, cce, net = self.compute_loss_from_targets(
num=num,
bin=bin,
cat=cat,
Expand All @@ -638,6 +643,10 @@ def compute_loss(self, num, bin, cat, target_df, should_log=True, _id=False):
_id=_id,
)

net = net.cpu().item()

return mse, bce, cce, net

def compute_loss_from_targets(self, num, bin, cat, num_target, bin_target, cat_target, should_log=True, _id=False):
"""Computes the loss from targets.
Expand Down Expand Up @@ -670,38 +679,45 @@ def compute_loss_from_targets(self, num, bin, cat, num_target, bin_target, cat_t
should_log = True
else:
should_log = False
net_loss = []
mse_loss = self.mse(num, num_target)
net_loss += list(mse_loss.mean(dim=0).cpu().detach().numpy())
mse_loss = mse_loss.mean()
bce_loss = self.bce(bin, bin_target)

net_loss += list(bce_loss.mean(dim=0).cpu().detach().numpy())
bce_loss = bce_loss.mean()
cce_loss = []
for i, ft in enumerate(self.categorical_fts):
loss = self.cce(cat[i], cat_target[i])
loss = loss.mean()
cce_loss.append(loss)
val = loss.cpu().item()
net_loss += [val]
# Calculate the numerical loss (per feature)
mse_loss: torch.Tensor = self.mse(num, num_target).mean(dim=0)

# Calculate the binary loss (per feature)
bce_loss: torch.Tensor = self.bce(bin, bin_target).mean(dim=0)

# To calc the categorical loss, we need to average the loss of each categorical feature independently (since
# they will have a different number of categories)
cce_loss_list = []

for i in range(len(self.categorical_fts)):
# Take the full mean but ensure the output is a 1x1 tensor to make it easier to concatenate
cce_loss_list.append(self.cce(cat[i], cat_target[i]).mean(dim=0, keepdim=True))

if (len(cce_loss_list) > 0):
cce_loss = torch.cat(cce_loss_list)
else:
cce_loss = torch.Tensor().to(self.device)

# The net loss should have one loss per feature
net_loss = 0
for loss in [mse_loss, bce_loss, cce_loss]:
if len(loss) > 0:
net_loss += loss.sum()
net_loss /= self.get_feature_count()

if should_log:
# Convert it to a list of numpy
net_loss_list = torch.cat((mse_loss, bce_loss, cce_loss)).tolist()

if self.training:
self.logger.training_step(net_loss)
self.logger.training_step(net_loss_list)
elif _id:
self.logger.id_val_step(net_loss)
self.logger.id_val_step(net_loss_list)
elif not self.training:
self.logger.val_step(net_loss)

net_loss = np.array(net_loss).mean()
return mse_loss, bce_loss, cce_loss, net_loss
self.logger.val_step(net_loss_list)

def do_backward(self, mse, bce, cce):
# running `backward()` seperately on mse/bce/cce is equivalent to summing them up and run `backward()` once
loss_fn = mse + bce
for ls in cce:
loss_fn += ls
loss_fn.backward()
return mse_loss.mean(), bce_loss.mean(), cce_loss.mean(), net_loss

def compute_baseline_performance(self, in_, out_):
"""
Expand Down Expand Up @@ -729,6 +745,7 @@ def compute_baseline_performance(self, in_, out_):
codes_pred.append(pred)
mse_loss, bce_loss, cce_loss, net_loss = self.compute_loss(num_pred, bin_pred, codes_pred, out_,
should_log=False)

if isinstance(self.logger, BasicLogger):
self.logger.baseline_loss = net_loss
return net_loss
Expand Down Expand Up @@ -981,11 +998,11 @@ def _fit_batch(self, input_swapped, num_target, bin_target, cat_target, **kwargs
cat_target=cat_target,
should_log=True,
)
self.do_backward(mse, bce, cce)
net_loss.backward()
self.optim.step()
self.optim.zero_grad()

return net_loss
return net_loss.cpu().item()

def _compute_baseline_performance_from_dataset(self, validation_dataset):
self.eval()
Expand Down Expand Up @@ -1028,7 +1045,7 @@ def _compute_batch_baseline_performance(
cat_target=cat_target,
should_log=False
)
return net_loss
return net_loss.cpu().item()

def _validate_dataset(self, validation_dataset, rank=None):
"""Runs a validation loop on the given validation dataset, computing and returning the average loss of both the original
Expand Down Expand Up @@ -1108,7 +1125,7 @@ def _validate_batch(self, input_original, input_swapped, num_target, bin_target,
cat_target=cat_target,
should_log=True,
)
return orig_net_loss, net_loss
return orig_net_loss.cpu().item(), net_loss.cpu().item()

def _populate_loss_stats_from_dataset(self, dataset):
"""Populates the `self.feature_loss_stats` dict with feature losses computed using the provided dataset.
Expand Down
Loading

0 comments on commit ee0649c

Please sign in to comment.