Skip to content

Commit

Permalink
Refactor pipelines to use BasePipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
tillprochaska committed Dec 7, 2024
1 parent 708afa2 commit 89de9ab
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 99 deletions.
29 changes: 9 additions & 20 deletions backend/howtheyvote/pipelines/members.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,23 @@
ScrapingError,
)
from ..store import Aggregator, BulkWriter, index_records, map_member
from .common import BasePipeline

log = get_logger(__name__)


class MembersPipeline:
class MembersPipeline(BasePipeline):
def __init__(self, term: int):
super().__init__(term=term)
self.term = term
self._member_ids: set[str] = set()

def run(self) -> None:
log.info(
"Running pipeline",
name=type(self).__name__,
term=self.term,
)

try:
self._scrape_members()
self._scrape_member_groups()
self._scrape_member_infos()
self._download_member_photos()
self._index_members()
except ScrapingError:
log.exception(
"Failed running pipeline",
name=type(self).__name__,
term=self.term,
)
def _run(self) -> None:
self._scrape_members()
self._scrape_member_groups()
self._scrape_member_infos()
self._download_member_photos()
self._index_members()

def _scrape_members(self) -> None:
log.info("Scraping RCV lists", term=self.term)
Expand Down
35 changes: 11 additions & 24 deletions backend/howtheyvote/pipelines/press.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
ScrapingError,
)
from ..store import Aggregator, BulkWriter, index_records, map_press_release, map_vote
from .common import BasePipeline

log = get_logger(__name__)


class PressPipeline:
class PressPipeline(BasePipeline):
# At the time we introduced this constant, the value covered roughly one term. However,
# this obviously depends on the amount of press releases published and might need to be
# adjusted or made configurable in the future.
Expand All @@ -30,35 +31,21 @@ def __init__(
date: datetime.date | None = None,
with_rss: bool | None = False,
):
super().__init__(date=date, with_rss=with_rss)
self.date = date
self.with_rss = with_rss
self._release_ids: set[str] = set()
self._vote_ids: set[str] = set()

def run(self) -> None:
log.info(
"Running pipeline",
name=type(self).__name__,
date=self.date,
with_rss=self.with_rss,
)
def _run(self) -> None:
if self.with_rss:
self._scrape_press_releases_rss()

try:
if self.with_rss:
self._scrape_press_releases_rss()

self._scrape_press_releases_index()
self._scrape_press_releases()
self._analyze_featured_votes()
self._index_press_releases()
self._index_votes()
except ScrapingError:
log.exception(
"Failed running pipeline",
name=type(self).__name__,
date=self.date,
with_rss=self.with_rss,
)
self._scrape_press_releases_index()
self._scrape_press_releases()
self._analyze_featured_votes()
self._index_press_releases()
self._index_votes()

def _scrape_press_releases_rss(self) -> None:
log.info("Fetching press releases from RSS", date=self.date)
Expand Down
71 changes: 26 additions & 45 deletions backend/howtheyvote/pipelines/rcv_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
)
from ..sharepics import generate_vote_sharepic
from ..store import Aggregator, BulkWriter, index_records, map_vote, map_vote_group
from .common import DataUnavailableError, DataUnchangedError, PipelineError
from .common import BasePipeline, DataUnavailableError, DataUnchangedError

log = get_logger(__name__)


class RCVListPipeline:
class RCVListPipeline(BasePipeline):
"""Scrapes the RCV vote results for a single day, then runs analysis on the
extracted votes and scrapes additional information such as data about legislative
procedures."""
Expand All @@ -43,6 +43,7 @@ def __init__(
date: datetime.date,
last_run_checksum: str | None = None,
):
super().__init__(term=term, date=date, last_run_checksum=last_run_checksum)
self.term = term
self.date = date
self.last_run_checksum = last_run_checksum
Expand All @@ -51,48 +52,24 @@ def __init__(
self._vote_group_ids: set[str] = set()
self._request_cache: RequestCache = LRUCache(maxsize=25)

def run(self) -> None:
log.info(
"Running pipeline",
name=type(self).__name__,
term=self.term,
date=self.date,
)

try:
self._scrape_rcv_list()
self._scrape_documents()
self._scrape_eurlex_documents()
self._scrape_procedures()
self._scrape_eurlex_procedures()
self._analyze_main_votes()
self._analyze_vote_groups()
self._analyze_vote_data_issues()
self._index_votes()

# Share pictures have to be generated after the votes are indexed. Otherwise,
# rendering the share pictures fails as data about new votes hasn’t yet been
# written to the database.
self._generate_vote_sharepics()

self._analyze_vote_groups_data_issues()
self._index_vote_groups()
except NoWorkingUrlError as exc:
log.exception(
"Failed running pipeline",
name=type(self).__name__,
term=self.term,
date=self.date,
)
raise DataUnavailableError("Pipeline data source is not available") from exc
except ScrapingError as exc:
log.exception(
"Failed running pipeline",
name=type(self).__name__,
term=self.term,
date=self.date,
)
raise PipelineError("Failed running pipeline") from exc
def _run(self) -> None:
self._scrape_rcv_list()
self._scrape_documents()
self._scrape_eurlex_documents()
self._scrape_procedures()
self._scrape_eurlex_procedures()
self._analyze_main_votes()
self._analyze_vote_groups()
self._analyze_vote_data_issues()
self._index_votes()

# Share pictures have to be generated after the votes are indexed. Otherwise,
# rendering the share pictures fails as data about new votes hasn’t yet been
# written to the database.
self._generate_vote_sharepics()

self._analyze_vote_groups_data_issues()
self._index_vote_groups()

def _scrape_rcv_list(self) -> None:
log.info("Fetching active members", date=self.date)
Expand All @@ -113,7 +90,11 @@ def _scrape_rcv_list(self) -> None:
date=self.date,
active_members=active_members,
)
fragments = scraper.run()

try:
fragments = scraper.run()
except NoWorkingUrlError as exc:
raise DataUnavailableError("Pipeline data source is not available") from exc

if (
self.last_run_checksum is not None
Expand Down
17 changes: 7 additions & 10 deletions backend/howtheyvote/pipelines/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@
from ..models import PlenarySession
from ..scrapers import CalendarSessionsScraper, ODPSessionScraper, ScrapingError
from ..store import Aggregator, BulkWriter, index_records, map_plenary_session
from .common import BasePipeline

log = get_logger(__name__)


class SessionsPipeline:
class SessionsPipeline(BasePipeline):
def __init__(self, term: int):
super().__init__(term=term)
self.term = term
self._session_ids: set[str] = set()

def run(self) -> None:
log.info("Running pipeline", name=type(self).__name__, term=self.term)

try:
self._scrape_sessions()
self._scrape_session_locations()
self._index_sessions()
except ScrapingError:
log.exception("Failed running pipeline", name=type(self).__name__, term=self.term)
def _run(self) -> None:
self._scrape_sessions()
self._scrape_session_locations()
self._index_sessions()

def _scrape_sessions(self) -> None:
log.info("Scrapping plenary sessions", term=self.term)
Expand Down

0 comments on commit 89de9ab

Please sign in to comment.