Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to MilvusVectorDB Service #1294

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
79752dc
Updated Milvus vector DB files
bsuryadevara Oct 20, 2023
6ca9c82
Updated Milvus vector DB files
bsuryadevara Oct 20, 2023
8fbcb50
Merge branch 'fea-sherlock' into 1272-fea-improve-the-vectordbservice
bsuryadevara Oct 20, 2023
31aa8d3
Updated Milvus vector DB files
bsuryadevara Oct 20, 2023
fbf2f4b
Updated Milvus vector DB files
bsuryadevara Oct 20, 2023
61d81d5
Merge branch '1272-fea-improve-the-vectordbservice' of github.com:bsu…
bsuryadevara Oct 20, 2023
13cd13c
Merge remote-tracking branch 'upstream/fea-sherlock' into 1272-fea-im…
bsuryadevara Oct 23, 2023
b67882a
collection_conf key
bsuryadevara Oct 23, 2023
13720cf
Updated rss feed processing files
bsuryadevara Oct 24, 2023
3165b79
Updated tests
bsuryadevara Oct 24, 2023
4a54ad1
Removed external url and replaced with mock objects
bsuryadevara Oct 24, 2023
cefcb12
Fixed cannot mix list and non-list bug
bsuryadevara Oct 25, 2023
145c73d
Merge remote-tracking branch 'upstream/fea-sherlock' into 1272-fea-im…
bsuryadevara Oct 25, 2023
27c3f6f
Merge remote-tracking branch 'upstream/fea-sherlock' into 1272-fea-im…
bsuryadevara Oct 25, 2023
ce4f6bc
Fixed datatype mismatch error
bsuryadevara Oct 25, 2023
74730a7
replaced cudf with pandas
bsuryadevara Oct 25, 2023
509aa02
Added callback to service argument and updated write to vdb stage
bsuryadevara Oct 26, 2023
0c253c3
Added callback to service argument and updated write to vdb stage
bsuryadevara Oct 26, 2023
a09c720
Removed private variables from tests
bsuryadevara Oct 26, 2023
9f96fb2
Merge remote-tracking branch 'upstream/fea-sherlock' into 1272-fea-im…
bsuryadevara Oct 26, 2023
1dc7114
Fixed pylint errors
bsuryadevara Oct 27, 2023
11c593a
Merge remote-tracking branch 'upstream/fea-sherlock' into 1272-fea-im…
bsuryadevara Oct 27, 2023
1936115
Updated pymilvus to 2.3.2
bsuryadevara Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 57 additions & 48 deletions morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

import logging
import os
import typing
import time
from dataclasses import asdict
from dataclasses import dataclass
from urllib.parse import urlparse

import feedparser
Expand All @@ -27,6 +29,17 @@
logger = logging.getLogger(__name__)


@dataclass
class FeedStats:
"""Data class to hold error feed stats"""

failure_count: int
success_count: int
last_failure: float
last_success: float
last_try_result: str


class RSSController:
"""
RSSController handles fetching and processing of RSS feed entries.
Expand All @@ -46,14 +59,17 @@ class RSSController:
Enable caching of RSS feed request data.
cache_dir : str, optional, default = "./.cache/http"
Cache directory for storing RSS feed request data.
cooldown_interval : int, optional, default = 600
Cooldown interval in seconds if there is a failure in fetching or parsing the feed.
"""

def __init__(self,
feed_input: str | list[str],
batch_size: int = 128,
run_indefinitely: bool = None,
enable_cache: bool = False,
cache_dir: str = "./.cache/http"):
cache_dir: str = "./.cache/http",
cooldown_interval: int = 600):

if (isinstance(feed_input, str)):
feed_input = [feed_input]
Expand All @@ -62,6 +78,12 @@ def __init__(self,
self._feed_input = set(feed_input)
self._batch_size = batch_size
self._previous_entries = set() # Stores the IDs of previous entries to prevent the processing of duplicates.
self._cooldown_interval = cooldown_interval

# Validate feed_input
for f in self._feed_input:
if not RSSController.is_url(f) and not os.path.exists(f):
raise ValueError(f"Invalid URL or file path: {f}")

if (run_indefinitely is None):
# If feed_input is URL. Runs indefinitely
Expand All @@ -74,7 +96,11 @@ def __init__(self,
self._session = requests_cache.CachedSession(os.path.join(cache_dir, "RSSController.sqlite"),
backend="sqlite")

self._errored_feeds = [] # Feeds that have thrown an error and wont be retried
self._feed_stats_dict = {input: FeedStats(failure_count=0,
success_count=0,
last_failure=-1,
last_success=-1,
last_try_result="Unknown") for input in self._feed_input}

@property
def run_indefinitely(self):
Expand All @@ -94,6 +120,7 @@ def _read_file_content(self, file_path: str) -> str:
return file.read()

def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> feedparser.FeedParserDict:

feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input)

soup = BeautifulSoup(feed_input, 'xml')
Expand All @@ -116,7 +143,7 @@ def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) ->
if child.name == "link":
link_value = child.get_text()
if not link_value:
feed_item[child.name] = child.get('href')
feed_item[child.name] = child.get('href', 'Unknown value')
else:
feed_item[child.name] = link_value
# To be consistant with feedparser entries, rename guid to id
Expand Down Expand Up @@ -153,20 +180,17 @@ def _try_parse_feed(self, url: str) -> feedparser.FeedParserDict:

if is_url_with_session:
fallback = True
try:
logger.info("Failed to parse feed: %s. Trying to parse using feedparser directly.", url)
feed = feedparser.parse(url)
except Exception as ex:
raise RuntimeError(f"Failed to parse feed using fallback: {url}: {ex}") from ex
logger.info("Failed to parse feed: %s. Trying to parse using feedparser directly.", url)
feed = feedparser.parse(url)

if feed["bozo"]:
try:
logger.warning("Failed to parse feed: %s, %s. Trying with other source",
url,
feed['bozo_exception'])
logger.info("Failed to parse feed: %s, %s. Try parsing the feed manually",
url,
feed['bozo_exception'])
feed = self._try_parse_feed_with_beautiful_soup(url, is_url)
except Exception as exec_info:
raise RuntimeError(f"Invalid feed input: {url}. Error: {exec_info}") from exec_info
except Exception:
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
raise

logger.debug("Parsed feed: %s. Cache hit: %s. Fallback: %s", url, cache_hit, fallback)

Expand All @@ -182,17 +206,25 @@ def parse_feeds(self):
The parsed feed content.
"""
for url in self._feed_input:
feed_stats: FeedStats = self._feed_stats_dict[url]
current_time = time.time()
try:
if (url in self._errored_feeds):
continue
if ((current_time - feed_stats.last_failure) >= self._cooldown_interval):
feed = self._try_parse_feed(url)

feed = self._try_parse_feed(url)
feed_stats.last_success = current_time
feed_stats.success_count += 1
feed_stats.last_try_result = "Success"

yield feed
yield feed

except Exception as ex:
logger.warning("Failed to parse feed: %s: %s. The feed will be not be retried.", url, ex)
self._errored_feeds.append(url)
logger.warning("Failed to parse feed: %s: %s.", url, ex)
feed_stats.last_failure = current_time
feed_stats.failure_count += 1
feed_stats.last_try_result = "Failure"

logger.debug("Feed stats: %s", asdict(feed_stats))

def fetch_dataframes(self):
"""
Expand Down Expand Up @@ -222,45 +254,22 @@ def fetch_dataframes(self):
entry_accumulator.append(entry)

if self._batch_size > 0 and len(entry_accumulator) >= self._batch_size:
yield self.create_dataframe(entry_accumulator)
yield cudf.DataFrame(entry_accumulator)
entry_accumulator.clear()

self._previous_entries = current_entries

# Yield any remaining entries.
if entry_accumulator:
df = self.create_dataframe(entry_accumulator)
# TODO (Bhargav): Debug : cannot mix list and non-list, non-null values error
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
df = cudf.DataFrame(entry_accumulator)
yield df
else:
logger.debug("No new entries found.")

except Exception as exc:
raise RuntimeError(f"Error fetching or processing feed entries: {exc}") from exc

def create_dataframe(self, entries: typing.List[typing.Tuple]) -> cudf.DataFrame:
"""
Create a DataFrame from accumulated entry data.

Parameters
----------
entries : typing.List[typing.Tuple]
List of accumulated feed entries.

Returns
-------
cudf.DataFrame
A DataFrame containing feed entry data.

Raises
------
RuntimeError
Error creating DataFrame.
"""
try:
return cudf.DataFrame(entries)
except Exception as exc:
logger.error("Error creating DataFrame: %s", exc)
raise RuntimeError(f"Error creating DataFrame: {exc}") from exc
logger.error(f"Error fetching or processing feed entries: {exc}")
raise

@classmethod
def is_url(cls, feed_input: str) -> bool:
Expand Down
52 changes: 21 additions & 31 deletions morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,30 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource):
Interval in seconds between fetching new feed items.
stop_after: int, default = 0
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
max_retries : int, optional, default = 3
Maximum number of retries for fetching entries on exception.
batch_size : int, optional, default = None
Number of feed items to accumulate before creating a DataFrame.
enable_cache : bool, optional, default = False
Enable caching of RSS feed request data.
cache_dir : str, optional, default = "./.cache/http"
Cache directory for storing RSS feed request data.
cooldown_interval : int, optional, default = 600
Cooldown interval in seconds if there is a failure in fetching or parsing the feed.
"""

def __init__(self,
c: Config,
feed_input: list[str],
interval_secs: float = 600,
stop_after: int = 0,
max_retries: int = 5,
run_indefinitely: bool = None,
batch_size: int = None,
enable_cache: bool = False,
cache_dir: str = "./.cache/http"):
cache_dir: str = "./.cache/http",
cooldown_interval: int = 600):
super().__init__(c)
self._stop_requested = False
self._stop_after = stop_after
self._interval_secs = interval_secs
self._max_retries = max_retries

if (batch_size is None):
batch_size = c.pipeline_batch_size
Expand All @@ -83,7 +82,8 @@ def __init__(self,
batch_size=batch_size,
run_indefinitely=run_indefinitely,
enable_cache=enable_cache,
cache_dir=cache_dir)
cache_dir=cache_dir,
cooldown_interval=cooldown_interval)

@property
def name(self) -> str:
Expand All @@ -103,46 +103,36 @@ def _fetch_feeds(self) -> MessageMeta:
"""
Fetch RSS feed entries and yield as MessageMeta object.
"""
retries = 0

while (not self._stop_requested) and (retries < self._max_retries):
while (not self._stop_requested):
try:
for df in self._controller.fetch_dataframes():
df_size = len(df)
self._records_emitted += df_size

if logger.isEnabledFor(logging.DEBUG):
logger.debug("Received %d new entries...", df_size)
logger.debug("Emitted %d records so far.", self._records_emitted)

yield MessageMeta(df=df)

if (self._stop_after > 0 and self._records_emitted >= self._stop_after):
self._stop_requested = True
logger.debug("Stop limit reached...preparing to halt the source.")
break
self._records_emitted += df_size

except Exception as exc:
if not self._controller.run_indefinitely:
self._stop_requested = True
continue
logger.error("Failed either in the process of fetching or processing entries: %d.", exc)
raise

logger.debug("Waiting for %d seconds before fetching again...", self._interval_secs)
time.sleep(self._interval_secs)
if (self._stop_after > 0 and self._records_emitted >= self._stop_after):
self._stop_requested = True
logger.debug("Stop limit reached... preparing to halt the source.")
break

except Exception as exc:
if not self._controller.run_indefinitely:
logger.error("The input provided is not a URL or a valid path, therefore, the maximum " +
"retries are being overridden, and early exiting is triggered.")
raise RuntimeError(f"Failed to fetch feed entries : {exc}") from exc

retries += 1
logger.warning("Error fetching feed entries. Retrying (%d/%d)...", retries, self._max_retries)
logger.debug("Waiting for 5 secs before retrying...")
time.sleep(5) # Wait before retrying

if retries == self._max_retries: # Check if retries exceeded the limit
logger.error("Max retries reached. Unable to fetch feed entries.")
raise RuntimeError(f"Failed to fetch feed entries after max retries: {exc}") from exc
if not self._controller.run_indefinitely:
self._stop_requested = True
continue

logger.debug("Waiting for %d seconds before fetching again...", self._interval_secs)
time.sleep(self._interval_secs)

logger.debug("Source stopped.")

Expand Down
Loading