Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate Redundant Fetches in RSS Controller #1442

53 changes: 30 additions & 23 deletions morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,18 @@ def __init__(self,
run_indefinitely = any(RSSController.is_url(f) for f in self._feed_input)

self._run_indefinitely = run_indefinitely
self._enable_cache = enable_cache

self._session = None
if enable_cache:
self._session = requests_cache.CachedSession(os.path.join(cache_dir, "RSSController.sqlite"),
backend="sqlite")
else:
self._session = requests.session()

self._session.headers.update({
"User-Agent":
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
})

self._feed_stats_dict = {
input:
Expand All @@ -119,11 +126,6 @@ def run_indefinitely(self):
"""Property that determines to run the source indefinitely"""
return self._run_indefinitely

@property
def session_exist(self) -> bool:
"""Property that indicates the existence of a session."""
return bool(self._session)

def get_feed_stats(self, feed_url: str) -> FeedStats:
"""
Get feed input stats.
Expand All @@ -148,22 +150,27 @@ def get_feed_stats(self, feed_url: str) -> FeedStats:

return self._feed_stats_dict[feed_url]

def _get_response_text(self, url: str) -> str:
if self.session_exist:
response = self._session.get(url)
else:
response = requests.get(url, timeout=self._request_timeout)

return response.text

def _read_file_content(self, file_path: str) -> str:
with open(file_path, 'r', encoding="utf-8") as file:
return file.read()

def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict":
def _fetch_feed_content(self, feed_input: str, is_url: bool) -> str:
# If input is an URL.
if is_url:
if not self._enable_cache:
# If cache is not enabled, fetch feed directly using requests.Session.
response = self._session.get(feed_input, timeout=self._request_timeout)
return response.text

# If we are here, feed_input is an actual feed content retrieved from the cache.
return feed_input

feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input)
# If original input is not an URL, then read the content from the file path.
return self._read_file_content(feed_input)
mdemoret-nv marked this conversation as resolved.
Show resolved Hide resolved

def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict":

feed_input = self._fetch_feed_content(feed_input, is_url)
soup = BeautifulSoup(feed_input, 'xml')

# Verify whether the given feed has 'item' or 'entry' tags.
Expand Down Expand Up @@ -205,10 +212,10 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict":

fallback = False
cache_hit = False
is_url_with_session = is_url and self.session_exist
use_cache = is_url and self._enable_cache

if is_url_with_session:
response = self._session.get(url)
if use_cache:
response = self._session.get(url, timeout=self._request_timeout)
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
cache_hit = response.from_cache
feed_input = response.text
else:
Expand All @@ -219,15 +226,15 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict":
if feed["bozo"]:
cache_hit = False

if is_url_with_session:
if use_cache:
fallback = True
logger.info("Failed to parse feed: %s. Trying to parse using feedparser directly.", url)
logger.info("Parsing the cached feed for URL '%s' failed. Attempting direct parsing with feedparser.",
url)
feed = feedparser.parse(url)

if feed["bozo"]:
try:
logger.info("Failed to parse feed: %s, %s. Try parsing feed manually", url, feed['bozo_exception'])
feed = self._try_parse_feed_with_beautiful_soup(url, is_url)
feed = self._try_parse_feed_with_beautiful_soup(feed_input, is_url)
except Exception:
logger.error("Failed to parse the feed manually: %s", url)
raise
Expand Down
30 changes: 8 additions & 22 deletions tests/controllers/test_rss_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import feedparser
import pandas as pd
import pytest
import requests

from _utils import TEST_DIRS
from morpheus.controllers.rss_controller import FeedStats
Expand Down Expand Up @@ -147,21 +148,16 @@ def test_batch_size(feed_input: list[str], batch_size: int):
def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enable_cache: bool, mock_get_response: Mock):
controller = RSSController(feed_input=feed_input, enable_cache=enable_cache)

if is_url:
if enable_cache:
with patch("morpheus.controllers.rss_controller.requests_cache.CachedSession.get") as mock_get:
mock_get.return_value = mock_get_response
feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url)
else:
with patch("morpheus.controllers.rss_controller.requests.get") as mock_get:
mock_get.return_value = mock_get_response
feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url)

if is_url and not enable_cache:
with patch.object(requests.Session, 'get') as mock_get:
mock_get.return_value = mock_get_response
feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url)
else:
feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url)
# When enable_cache is set to 'True', the feed content is provided as input.
feed_data = controller._try_parse_feed_with_beautiful_soup(
mock_get_response.text if enable_cache else feed_input, is_url)

assert isinstance(feed_data, feedparser.FeedParserDict)

assert len(feed_data.entries) > 0

for entry in feed_data.entries:
Expand All @@ -180,16 +176,6 @@ def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enabl
assert isinstance(feed_data["entries"], list)


@pytest.mark.parametrize("enable_cache", [True, False])
def test_enable_disable_cache(enable_cache):
controller = RSSController(feed_input=test_urls, enable_cache=enable_cache)

if enable_cache:
assert controller.session_exist
else:
assert not controller.session_exist


bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
def test_parse_feeds(mock_feed: feedparser.FeedParserDict):
feed_input = test_urls[0]
cooldown_interval = 620
Expand Down
Loading