diff --git a/backend/howtheyvote/alembic/versions/1f516b18c4f6_rename_result_column_to_status.py b/backend/howtheyvote/alembic/versions/1f516b18c4f6_rename_result_column_to_status.py new file mode 100644 index 000000000..b4cbe85a9 --- /dev/null +++ b/backend/howtheyvote/alembic/versions/1f516b18c4f6_rename_result_column_to_status.py @@ -0,0 +1,23 @@ +"""Rename result column to status + +Revision ID: 1f516b18c4f6 +Revises: 9b35d19b64c4 +Create Date: 2024-12-08 11:25:26.051408 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "1f516b18c4f6" +down_revision = "9b35d19b64c4" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.alter_column("pipeline_runs", column_name="result", new_column_name="status") + + +def downgrade() -> None: + op.alter_column("pipeline_runs", column_name="status", new_column_name="result") diff --git a/backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py b/backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py new file mode 100644 index 000000000..efbc165ca --- /dev/null +++ b/backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py @@ -0,0 +1,24 @@ +"""Add checksum column to pipeline_runs table + +Revision ID: 2f958a6f147d +Revises: 1f516b18c4f6 +Create Date: 2024-12-07 17:12:10.792707 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "2f958a6f147d" +down_revision = "1f516b18c4f6" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column("pipeline_runs", sa.Column("checksum", sa.Unicode)) + + +def downgrade() -> None: + op.drop_column("pipeline_runs", "checksum") diff --git a/backend/howtheyvote/models/__init__.py b/backend/howtheyvote/models/__init__.py index 0aa3a2d89..75ece9a2d 100644 --- a/backend/howtheyvote/models/__init__.py +++ b/backend/howtheyvote/models/__init__.py @@ -1,4 +1,4 @@ -from .common import Base, BaseWithId, DataIssue, Fragment, PipelineRun, PipelineRunResult +from .common import Base, BaseWithId, DataIssue, Fragment, PipelineRun, PipelineStatus from .country import Country, CountryType from .eurovoc import EurovocConcept, EurovocConceptType from .group import Group @@ -24,7 +24,7 @@ "BaseWithId", "Fragment", "PipelineRun", - "PipelineRunResult", + "PipelineStatus", "DataIssue", "Country", "CountryType", diff --git a/backend/howtheyvote/models/common.py b/backend/howtheyvote/models/common.py index ef084442d..d5c2d2502 100644 --- a/backend/howtheyvote/models/common.py +++ b/backend/howtheyvote/models/common.py @@ -36,10 +36,11 @@ class DataIssue(Enum): VOTE_GROUP_NO_MAIN_VOTE = "VOTE_GROUP_NO_MAIN_VOTE" -class PipelineRunResult(Enum): +class PipelineStatus(Enum): SUCCESS = "SUCCESS" FAILURE = "FAILURE" DATA_UNAVAILABLE = "DATA_UNAVAILABLE" + DATA_UNCHANGED = "DATA_UNCHANGED" class PipelineRun(Base): @@ -49,4 +50,5 @@ class PipelineRun(Base): started_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime) finished_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime) pipeline: Mapped[str] = mapped_column(sa.Unicode) - result: Mapped[PipelineRunResult] = mapped_column(sa.Enum(PipelineRunResult)) + status: Mapped[PipelineStatus] = mapped_column(sa.Enum(PipelineStatus)) + checksum: Mapped[str] = mapped_column(sa.Unicode) diff --git a/backend/howtheyvote/pipelines/__init__.py b/backend/howtheyvote/pipelines/__init__.py index f4d6566c4..04990465c 100644 --- a/backend/howtheyvote/pipelines/__init__.py +++ b/backend/howtheyvote/pipelines/__init__.py @@ -1,12 +1,11 @@ -from .common import DataUnavailableError, PipelineError +from .common import PipelineResult from .members import MembersPipeline from .press import PressPipeline from .rcv_list import RCVListPipeline from .sessions import SessionsPipeline __all__ = [ - "PipelineError", - "DataUnavailableError", + "PipelineResult", "RCVListPipeline", "PressPipeline", "MembersPipeline", diff --git a/backend/howtheyvote/pipelines/common.py b/backend/howtheyvote/pipelines/common.py index 26cf91486..91dc8c15e 100644 --- a/backend/howtheyvote/pipelines/common.py +++ b/backend/howtheyvote/pipelines/common.py @@ -1,6 +1,68 @@ +import hashlib +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any + +from requests import Response +from structlog import get_logger + +from ..models import PipelineStatus +from ..scrapers import ScrapingError + +log = get_logger(__name__) + + +@dataclass +class PipelineResult: + status: PipelineStatus + checksum: str | None + + class PipelineError(Exception): pass class DataUnavailableError(PipelineError): pass + + +class DataUnchangedError(PipelineError): + pass + + +class BasePipeline(ABC): + last_run_checksum: str | None + checksum: str | None + + def __init__(self, last_run_checksum: str | None = None, **kwargs: Any) -> None: + self.last_run_checksum = last_run_checksum + self.checksum = None + self._log = log.bind(pipeline=type(self).__name__, **kwargs) + + def run(self) -> PipelineResult: + self._log.info("Running pipeline") + + try: + self._run() + status = PipelineStatus.SUCCESS + except DataUnavailableError: + status = PipelineStatus.DATA_UNAVAILABLE + except DataUnchangedError: + status = PipelineStatus.DATA_UNCHANGED + except ScrapingError: + status = PipelineStatus.FAILURE + self._log.exception("Failed running pipeline") + + return PipelineResult( + status=status, + checksum=self.checksum, + ) + + @abstractmethod + def _run(self) -> None: + raise NotImplementedError + + +def compute_response_checksum(response: Response) -> str: + """Compute the SHA256 hash of the response contents.""" + return hashlib.sha256(response.content).hexdigest() diff --git a/backend/howtheyvote/pipelines/members.py b/backend/howtheyvote/pipelines/members.py index e1c4d6f4d..5570426c6 100644 --- a/backend/howtheyvote/pipelines/members.py +++ b/backend/howtheyvote/pipelines/members.py @@ -13,34 +13,23 @@ ScrapingError, ) from ..store import Aggregator, BulkWriter, index_records, map_member +from .common import BasePipeline log = get_logger(__name__) -class MembersPipeline: +class MembersPipeline(BasePipeline): def __init__(self, term: int): + super().__init__(term=term) self.term = term self._member_ids: set[str] = set() - def run(self) -> None: - log.info( - "Running pipeline", - name=type(self).__name__, - term=self.term, - ) - - try: - self._scrape_members() - self._scrape_member_groups() - self._scrape_member_infos() - self._download_member_photos() - self._index_members() - except ScrapingError: - log.exception( - "Failed running pipeline", - name=type(self).__name__, - term=self.term, - ) + def _run(self) -> None: + self._scrape_members() + self._scrape_member_groups() + self._scrape_member_infos() + self._download_member_photos() + self._index_members() def _scrape_members(self) -> None: log.info("Scraping RCV lists", term=self.term) diff --git a/backend/howtheyvote/pipelines/press.py b/backend/howtheyvote/pipelines/press.py index e1f1dccea..ceecad203 100644 --- a/backend/howtheyvote/pipelines/press.py +++ b/backend/howtheyvote/pipelines/press.py @@ -15,11 +15,12 @@ ScrapingError, ) from ..store import Aggregator, BulkWriter, index_records, map_press_release, map_vote +from .common import BasePipeline log = get_logger(__name__) -class PressPipeline: +class PressPipeline(BasePipeline): # At the time we introduced this constant, the value covered roughly one term. However, # this obviously depends on the amount of press releases published and might need to be # adjusted or made configurable in the future. @@ -30,35 +31,21 @@ def __init__( date: datetime.date | None = None, with_rss: bool | None = False, ): + super().__init__(date=date, with_rss=with_rss) self.date = date self.with_rss = with_rss self._release_ids: set[str] = set() self._vote_ids: set[str] = set() - def run(self) -> None: - log.info( - "Running pipeline", - name=type(self).__name__, - date=self.date, - with_rss=self.with_rss, - ) + def _run(self) -> None: + if self.with_rss: + self._scrape_press_releases_rss() - try: - if self.with_rss: - self._scrape_press_releases_rss() - - self._scrape_press_releases_index() - self._scrape_press_releases() - self._analyze_featured_votes() - self._index_press_releases() - self._index_votes() - except ScrapingError: - log.exception( - "Failed running pipeline", - name=type(self).__name__, - date=self.date, - with_rss=self.with_rss, - ) + self._scrape_press_releases_index() + self._scrape_press_releases() + self._analyze_featured_votes() + self._index_press_releases() + self._index_votes() def _scrape_press_releases_rss(self) -> None: log.info("Fetching press releases from RSS", date=self.date) diff --git a/backend/howtheyvote/pipelines/rcv_list.py b/backend/howtheyvote/pipelines/rcv_list.py index 9c11ffcff..cc6bb06d6 100644 --- a/backend/howtheyvote/pipelines/rcv_list.py +++ b/backend/howtheyvote/pipelines/rcv_list.py @@ -27,65 +27,54 @@ ) from ..sharepics import generate_vote_sharepic from ..store import Aggregator, BulkWriter, index_records, map_vote, map_vote_group -from .common import DataUnavailableError, PipelineError +from .common import ( + BasePipeline, + DataUnavailableError, + DataUnchangedError, + compute_response_checksum, +) log = get_logger(__name__) -class RCVListPipeline: +class RCVListPipeline(BasePipeline): """Scrapes the RCV vote results for a single day, then runs analysis on the extracted votes and scrapes additional information such as data about legislative procedures.""" - def __init__(self, term: int, date: datetime.date): + def __init__( + self, + term: int, + date: datetime.date, + last_run_checksum: str | None = None, + ): + super().__init__(term=term, date=date, last_run_checksum=last_run_checksum) self.term = term self.date = date + self.last_run_checksum = last_run_checksum + self.checksum: str | None = None self._vote_ids: set[str] = set() self._vote_group_ids: set[str] = set() self._request_cache: RequestCache = LRUCache(maxsize=25) - def run(self) -> None: - log.info( - "Running pipeline", - name=type(self).__name__, - term=self.term, - date=self.date, - ) - - try: - self._scrape_rcv_list() - self._scrape_documents() - self._scrape_eurlex_documents() - self._scrape_procedures() - self._scrape_eurlex_procedures() - self._analyze_main_votes() - self._analyze_vote_groups() - self._analyze_vote_data_issues() - self._index_votes() - - # Share pictures have to be generated after the votes are indexed. Otherwise, - # rendering the share pictures fails as data about new votes hasn’t yet been - # written to the database. - self._generate_vote_sharepics() - - self._analyze_vote_groups_data_issues() - self._index_vote_groups() - except NoWorkingUrlError as exc: - log.exception( - "Failed running pipeline", - name=type(self).__name__, - term=self.term, - date=self.date, - ) - raise DataUnavailableError("Pipeline data source is not available") from exc - except ScrapingError as exc: - log.exception( - "Failed running pipeline", - name=type(self).__name__, - term=self.term, - date=self.date, - ) - raise PipelineError("Failed running pipeline") from exc + def _run(self) -> None: + self._scrape_rcv_list() + self._scrape_documents() + self._scrape_eurlex_documents() + self._scrape_procedures() + self._scrape_eurlex_procedures() + self._analyze_main_votes() + self._analyze_vote_groups() + self._analyze_vote_data_issues() + self._index_votes() + + # Share pictures have to be generated after the votes are indexed. Otherwise, + # rendering the share pictures fails as data about new votes hasn’t yet been + # written to the database. + self._generate_vote_sharepics() + + self._analyze_vote_groups_data_issues() + self._index_vote_groups() def _scrape_rcv_list(self) -> None: log.info("Fetching active members", date=self.date) @@ -107,8 +96,23 @@ def _scrape_rcv_list(self) -> None: active_members=active_members, ) + try: + fragments = scraper.run() + except NoWorkingUrlError as exc: + raise DataUnavailableError("Pipeline data source is not available") from exc + + if ( + self.last_run_checksum is not None + and self.last_run_checksum == compute_response_checksum(scraper.response) + ): + raise DataUnchangedError( + "The data source hasn't changed since the last pipeline run." + ) + + self.checksum = compute_response_checksum(scraper.response) + writer = BulkWriter() - writer.add(scraper.run()) + writer.add(fragments) writer.flush() self._vote_ids = writer.get_touched() diff --git a/backend/howtheyvote/pipelines/sessions.py b/backend/howtheyvote/pipelines/sessions.py index 32459267f..d677e96a8 100644 --- a/backend/howtheyvote/pipelines/sessions.py +++ b/backend/howtheyvote/pipelines/sessions.py @@ -5,24 +5,21 @@ from ..models import PlenarySession from ..scrapers import CalendarSessionsScraper, ODPSessionScraper, ScrapingError from ..store import Aggregator, BulkWriter, index_records, map_plenary_session +from .common import BasePipeline log = get_logger(__name__) -class SessionsPipeline: +class SessionsPipeline(BasePipeline): def __init__(self, term: int): + super().__init__(term=term) self.term = term self._session_ids: set[str] = set() - def run(self) -> None: - log.info("Running pipeline", name=type(self).__name__, term=self.term) - - try: - self._scrape_sessions() - self._scrape_session_locations() - self._index_sessions() - except ScrapingError: - log.exception("Failed running pipeline", name=type(self).__name__, term=self.term) + def _run(self) -> None: + self._scrape_sessions() + self._scrape_session_locations() + self._index_sessions() def _scrape_sessions(self) -> None: log.info("Scrapping plenary sessions", term=self.term) diff --git a/backend/howtheyvote/scrapers/common.py b/backend/howtheyvote/scrapers/common.py index 3b9e9db92..28ff3babd 100644 --- a/backend/howtheyvote/scrapers/common.py +++ b/backend/howtheyvote/scrapers/common.py @@ -101,8 +101,8 @@ def __init__(self, request_cache: RequestCache | None = None, **kwargs: Any) -> def run(self) -> Any: self._log.info("Running scraper") - self._response = self._fetch() - doc = self._parse(self._response) + self.response = self._fetch() + doc = self._parse(self.response) return self._extract_data(doc) @abstractmethod @@ -128,7 +128,7 @@ def _fragment( model=model.__name__, source_id=source_id, source_name=type(self).__name__, - source_url=self._response.request.url, + source_url=self.response.request.url, group_key=group_key, data=data, ) diff --git a/backend/howtheyvote/worker/__init__.py b/backend/howtheyvote/worker/__init__.py index d50718d8f..dadaf6d86 100644 --- a/backend/howtheyvote/worker/__init__.py +++ b/backend/howtheyvote/worker/__init__.py @@ -2,22 +2,34 @@ import time from prometheus_client import Gauge -from sqlalchemy import exists, func, select +from sqlalchemy import select from structlog import get_logger from .. import config from ..db import Session from ..export import generate_export from ..files import file_path -from ..models import PipelineRun, PipelineRunResult, PlenarySession -from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline +from ..models import PipelineRun, PlenarySession +from ..pipelines import ( + MembersPipeline, + PipelineResult, + PressPipeline, + RCVListPipeline, + SessionsPipeline, +) from ..query import session_is_current_at -from .worker import SkipPipelineError, Weekday, Worker +from .worker import ( + SkipPipelineError, + Weekday, + Worker, + last_pipeline_run_checksum, + pipeline_ran_successfully, +) log = get_logger(__name__) -def op_rcv() -> None: +def op_rcv_midday() -> PipelineResult: """Checks if there is a current plenary session and, if yes, fetches the latest roll-call vote results.""" today = datetime.date.today() @@ -25,14 +37,40 @@ def op_rcv() -> None: if not _is_session_day(today): raise SkipPipelineError() - if _ran_successfully(RCVListPipeline, today): + if pipeline_ran_successfully(RCVListPipeline, today): raise SkipPipelineError() pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) - pipeline.run() + return pipeline.run() + + +def op_rcv_evening() -> PipelineResult: + """While on most plenary days, there’s only one voting session around midday, on some days + there is another sesssion in the evening, usually around 17:00. The vote results of the + evening sessions are appended to the same source document that also contains the results + of the midday votes. This method fetches the latest roll-call vote results, even if they + have been fetched successfully earlier on the same day.""" + today = datetime.date.today() + + if not _is_session_day(today): + raise SkipPipelineError() + + if pipeline_ran_successfully(RCVListPipeline, today, count=2): + raise SkipPipelineError() + + last_run_checksum = last_pipeline_run_checksum( + pipeline=RCVListPipeline, + date=today, + ) + pipeline = RCVListPipeline( + term=config.CURRENT_TERM, + date=today, + last_run_checksum=last_run_checksum, + ) + return pipeline.run() -def op_press() -> None: +def op_press() -> PipelineResult: """Checks if there is a current plenary session and, if yes, fetches the latest press releases from the Parliament’s news hub.""" today = datetime.date.today() @@ -41,19 +79,19 @@ def op_press() -> None: raise SkipPipelineError() pipeline = PressPipeline(date=today, with_rss=True) - pipeline.run() + return pipeline.run() -def op_sessions() -> None: +def op_sessions() -> PipelineResult: """Fetches plenary session dates.""" pipeline = SessionsPipeline(term=config.CURRENT_TERM) - pipeline.run() + return pipeline.run() -def op_members() -> None: +def op_members() -> PipelineResult: """Fetches information about all members of the current term.""" pipeline = MembersPipeline(term=config.CURRENT_TERM) - pipeline.run() + return pipeline.run() EXPORT_LAST_RUN = Gauge( @@ -75,19 +113,6 @@ def _is_session_day(date: datetime.date) -> bool: return session is not None -def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: - """Check if a given pipeline has been run successfully on a given day.""" - query = ( - exists() - .where(PipelineRun.pipeline == pipeline.__name__) - .where(func.date(PipelineRun.started_at) == func.date(date)) - .where(PipelineRun.result == PipelineRunResult.SUCCESS) - .select() - ) - - return bool(Session.execute(query).scalar()) - - worker = Worker() # Mon at 04:00 @@ -110,7 +135,7 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: # Mon-Thu between 12:00 and 15:00, every 10 mins worker.schedule_pipeline( - op_rcv, + op_rcv_midday, name=RCVListPipeline.__name__, weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU}, hours=range(12, 15), @@ -118,6 +143,16 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: tz=config.TIMEZONE, ) +# Mon-Thu between 17:00 and 20:00, every 10 mins +worker.schedule_pipeline( + op_rcv_evening, + name=RCVListPipeline.__name__, + weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU}, + hours=range(17, 20), + minutes=range(0, 60, 10), + tz=config.TIMEZONE, +) + # Mon-Thu, between 13:00 and 20:00, every 30 mins worker.schedule_pipeline( op_press, diff --git a/backend/howtheyvote/worker/worker.py b/backend/howtheyvote/worker/worker.py index c17621e0f..6aec5548c 100644 --- a/backend/howtheyvote/worker/worker.py +++ b/backend/howtheyvote/worker/worker.py @@ -9,12 +9,13 @@ from prometheus_client import Counter, Gauge, Histogram from prometheus_client import start_http_server as start_metrics_server from schedule import Scheduler +from sqlalchemy import func, select from structlog import get_logger from .. import config from ..db import Session -from ..models import PipelineRun, PipelineRunResult -from ..pipelines import DataUnavailableError +from ..models import PipelineRun, PipelineStatus +from ..pipelines import PipelineResult log = get_logger(__name__) @@ -25,13 +26,13 @@ PIPELINE_RUN_DURATION = Histogram( "htv_worker_pipeline_run_duration_seconds", "Duration of pipeline runs executed by the worker", - ["pipeline", "result"], + ["pipeline", "status"], ) PIPELINE_RUNS = Counter( "htv_worker_pipeline_runs_total", "Total number of pipeline runs executed by the worker", - ["pipeline", "result"], + ["pipeline", "status"], ) PIPELINE_NEXT_RUN = Gauge( @@ -58,9 +59,39 @@ class Weekday(enum.Enum): Handler = Callable[..., Any] +def pipeline_ran_successfully( + pipeline: type[object], + date: datetime.date, + count: int = 1, +) -> bool: + """Check if a given pipeline has been run successfully on a given day.""" + query = ( + select(func.count()) + .select_from(PipelineRun) + .where(PipelineRun.pipeline == pipeline.__name__) + .where(func.date(PipelineRun.started_at) == func.date(date)) + .where(PipelineRun.status == PipelineStatus.SUCCESS) + ) + result = Session.execute(query).scalar() or 0 + + return result >= count + + +def last_pipeline_run_checksum(pipeline: type[object], date: datetime.date) -> str | None: + """Returns the checksum of the most recent pipeline run on a given day.""" + query = ( + select(PipelineRun.checksum) + .where(PipelineRun.pipeline == pipeline.__name__) + .where(func.date(PipelineRun.started_at) == func.date(date)) + .where(PipelineRun.status == PipelineStatus.SUCCESS) + .order_by(PipelineRun.finished_at.desc()) + ) + return Session.execute(query).scalar() + + class Worker: """Running a worker starts a long-running process that executes data pipelines in regular - intervals and stores the result of the pipeline runs in the database.""" + intervals and stores the status of the pipeline runs in the database.""" def __init__(self) -> None: self._scheduler = Scheduler() @@ -131,7 +162,7 @@ def schedule( def schedule_pipeline( self, - handler: Handler, + handler: Callable[..., PipelineResult], name: str, weekdays: Iterable[Weekday] = set(Weekday), hours: Iterable[int] = {0}, @@ -145,20 +176,21 @@ def wrapped_handler() -> None: started_at = datetime.datetime.now(datetime.UTC) try: - handler() - result = PipelineRunResult.SUCCESS + result = handler() + status = result.status + checksum = result.checksum except SkipPipelineError: # Do not log skipped pipeline runs return - except DataUnavailableError: - result = PipelineRunResult.DATA_UNAVAILABLE except Exception: - result = PipelineRunResult.FAILURE + status = PipelineStatus.FAILURE + checksum = None + log.exception("Unhandled exception during pipeline run", pipeline=name) duration = time.time() - start_time finished_at = datetime.datetime.now(datetime.UTC) - labels = {"pipeline": name, "result": result.value} + labels = {"pipeline": name, "status": status.value} PIPELINE_RUNS.labels(**labels).inc() PIPELINE_RUN_DURATION.labels(**labels).observe(duration) @@ -166,7 +198,8 @@ def wrapped_handler() -> None: pipeline=name, started_at=started_at, finished_at=finished_at, - result=result.value, + status=status, + checksum=checksum, ) Session.add(run) @@ -179,7 +212,7 @@ def wrapped_handler() -> None: started_at=started_at.isoformat(), finished_at=finished_at.isoformat(), duration=duration, - result=result.value, + status=status.value, ) Session.remove() diff --git a/backend/tests/helpers.py b/backend/tests/helpers.py index b25e3abb3..ce9bd421b 100644 --- a/backend/tests/helpers.py +++ b/backend/tests/helpers.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import Any from sqlalchemy.orm import DeclarativeBase @@ -5,3 +6,8 @@ def record_to_dict(record: DeclarativeBase) -> dict[str, Any]: return {c.name: getattr(record, c.name) for c in record.__table__.columns} + + +def load_fixture(path: str) -> str: + base = Path(__file__).resolve().parent + return base.joinpath(path).read_text() diff --git a/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-evening.xml b/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-evening.xml new file mode 100644 index 000000000..86b6e13d6 --- /dev/null +++ b/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-evening.xml @@ -0,0 +1,92 @@ + + + + + AVERTISSEMENT + NOTICE + HINWEIS + + + Les corrections et intentions de vote sont mentionnées dans ce document sous les points de vote correspondants. Elles sont publiées pour information uniquement et ne modifient en rien le résultat de vote tel qu’annoncé en plénière. Pendant la session, les demandes de corrections et intentions de vote reçues avant 18h30 sont publiées le jour même. Les demandes ultérieures sont publiées à mesure des mises à jour successives de ce document, pendant une durée maximale de deux semaines. Signification des sigles: + (pour), - (contre), 0 (abstention) + + Corrections to votes and voting intentions appear below in the section relating to the vote concerned. They are published for information purposes only and do not alter the result of the vote as announced in plenary. During the part-session, requests for corrections to votes and voting intentions received before 18.30 will be published the same day. Subsequent requests will be included in this document each time it is updated in the two weeks following the part-session. Key to symbols: + (in favour), - (against), 0 (abstentions) + + In diesem Dokument sind unter den betreffenden Abstimmungspunkten die Berichtigungen des Stimmverhaltens und das beabsichtigte Stimmverhalten aufgeführt. Diese Angaben dienen ausschließlich der Information; keinesfalls wird durch sie das Abstimmungsergebnis geändert, das im Plenum bekannt gegeben wurde. Während der Tagung werden Anträge zu Berichtigungen des Stimmverhaltens und zum beabsichtigten Stimmverhalten, die bis 18.30 Uhr eingehen, am selben Tag veröffentlicht. Später eingehende Anträge werden sukzessive veröffentlicht, indem dieses Dokument während höchstens zwei Wochen regelmäßig aktualisiert wird. Zeichenerklärung: + (dafür), - (dagegen), 0 (Enthaltung) + + + + + ПРОТОКОЛРезултат от поименни гласувания - Приложение 2 + ZÁPISVýsledek jmenovitého hlasování - Příloha 2 + PROTOKOLResultat af afstemningerne ved navneopråb - Bilag 2 + PROTOKOLLErgebnis der namentlichen Abstimmungen - Anlage 2 + ΣΥΝΟΠΤIΚΑ ΠΡΑΚΤIΚΑΑποτέλεσμα των ψηφοφοριών με ονομαστική κλήση - Παράρτηµα 2 + MINUTESResult of roll-call votes - Annex 2 + ACTAResultados de las votaciones nominales - Anexo 2 + PROTOKOLLNimelise hääletuse tulemused - lisa 2 + PÖYTÄKIRJANimenhuutoäänestysten tulokset - Liite 2 + PROCÈS-VERBALRésultat des votes par appel nominal - Annexe 2 + MIONTUAIRISCÍTorthaí na vótála le glaoch rolla - Iarscríbhinn 2 + ZAPISNIKRezultat poimeničnog glasovanja - Prilog 2 + JEGYZŐKÖNYVA név szerinti szavazások eredménye - melléklet 2 + PROCESSO VERBALERisultato delle votazioni per appello nominale - Allegato 2 + PROTOKOLASVardinio balsavimo rezultatai - priedas 2 + PROTOKOLSRezultāti balsošanai pēc saraksta - pielikums 2 + MINUTIRiżultat tal-votazzjoni bis-sejħa tal-ismijiet - Anness 2 + NOTULENUitslag van de hoofdelijke stemmingen - Bijlage 2 + PROTOKÓŁWyniki głosowań imiennych - Załącznik 2 + ATAResultados das votações nominais - Anexo 2 + PROCES-VERBALRezultatul voturilor prin apel nominal - Anexa 2 + ZÁPISNICAVýsledok hlasovania podľa mien - Príloha 2 + ZAPISNIKIzid poimenskega glasovanja - Priloga 2 + PROTOKOLLResultat av omröstningarna med namnupprop - Bilaga 2 + + + A9-0163/2024 - Gabriele Bischoff - Article 10, § 6, alinéa 2 - Am 1 + + + Adamowicz + + + + + C9-0120/2024 - Rejet - Am 13= 23= + + + Adamowicz + + + + + Amendments to Parliament’s Rules of Procedure concerning the training on preventing conflict and harassment in the workplace and on good office management + Good agricultural and environmental condition standards, schemes for climate, environment and animal welfare + + + + BERICHTIGUNGEN DES STIMMVERHALTENS UND BEABSICHTIGTES STIMMVERHALTEN + RÄTTELSER/AVSIKTSFÖRKLARINGAR TILL AVGIVNA RÖSTER + ÄÄNESTYSKÄYTTÄYTYMISTÄ JA ÄÄNESTYSAIKEITA KOSKEVAT ILMOITUKSET + CORREÇÕES E INTENÇÕES DE VOTO + ПОПРАВКИ В ПОДАДЕНИТЕ ГЛАСОВЕ И НАМЕРЕНИЯ ЗА ГЛАСУВАНЕ + KORREZZJONIJIET U INTENZJONIJIET GĦALL-VOT + ΔΙΟΡΘΩΣΕΙΣ ΚΑΙ ΠΡΟΘΕΣΕΙΣ ΨΗΦΟΥ + BALSAVIMO PATAISYMAI IR KETINIMAI + CORRECTIONS TO VOTES AND VOTING INTENTIONS + BALSOJUMU LABOJUMI UN NODOMI BALSOT + IZMJENE DANIH GLASOVA I NAMJERE GLASAČA + CORREZIONI E INTENZIONI DI VOTO + CORRECTIONS ET INTENTIONS DE VOTE + SZAVAZATOK HELYESBÍTÉSEI ÉS SZAVAZÁSI SZÁNDÉKOK + CORRECCIONES E INTENCIONES DE VOTO + HÄÄLETUSE PARANDUSED JA HÄÄLETUSKAVATSUSED + OPRAVY HLASOVÁNÍ A SDĚLENÍ O ÚMYSLU HLASOVAT + OPRAVY HLASOVANIA A ZÁMERY PRI HLASOVANÍ + POPRAVKI IN NAMERE GLASOVANJA + CEARTÚCHÁIN AR AN VÓTA AGUS INTINNÍ VÓTÁLA + KOREKTY GŁOSOWANIA I ZAMIAR GŁOSOWANIA + CORECTĂRI ŞI INTENŢII DE VOT + STEMMERETTELSER OG -INTENTIONER + RECTIFICATIES STEMGEDRAG/ VOORGENOMEN STEMGEDRAG + + + diff --git a/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml b/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml new file mode 100644 index 000000000..42f4481af --- /dev/null +++ b/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml @@ -0,0 +1,83 @@ + + + + + AVERTISSEMENT + NOTICE + HINWEIS + + + Les corrections et intentions de vote sont mentionnées dans ce document sous les points de vote correspondants. Elles sont publiées pour information uniquement et ne modifient en rien le résultat de vote tel qu’annoncé en plénière. Pendant la session, les demandes de corrections et intentions de vote reçues avant 18h30 sont publiées le jour même. Les demandes ultérieures sont publiées à mesure des mises à jour successives de ce document, pendant une durée maximale de deux semaines. Signification des sigles: + (pour), - (contre), 0 (abstention) + + Corrections to votes and voting intentions appear below in the section relating to the vote concerned. They are published for information purposes only and do not alter the result of the vote as announced in plenary. During the part-session, requests for corrections to votes and voting intentions received before 18.30 will be published the same day. Subsequent requests will be included in this document each time it is updated in the two weeks following the part-session. Key to symbols: + (in favour), - (against), 0 (abstentions) + + In diesem Dokument sind unter den betreffenden Abstimmungspunkten die Berichtigungen des Stimmverhaltens und das beabsichtigte Stimmverhalten aufgeführt. Diese Angaben dienen ausschließlich der Information; keinesfalls wird durch sie das Abstimmungsergebnis geändert, das im Plenum bekannt gegeben wurde. Während der Tagung werden Anträge zu Berichtigungen des Stimmverhaltens und zum beabsichtigten Stimmverhalten, die bis 18.30 Uhr eingehen, am selben Tag veröffentlicht. Später eingehende Anträge werden sukzessive veröffentlicht, indem dieses Dokument während höchstens zwei Wochen regelmäßig aktualisiert wird. Zeichenerklärung: + (dafür), - (dagegen), 0 (Enthaltung) + + + + + ПРОТОКОЛРезултат от поименни гласувания - Приложение 2 + ZÁPISVýsledek jmenovitého hlasování - Příloha 2 + PROTOKOLResultat af afstemningerne ved navneopråb - Bilag 2 + PROTOKOLLErgebnis der namentlichen Abstimmungen - Anlage 2 + ΣΥΝΟΠΤIΚΑ ΠΡΑΚΤIΚΑΑποτέλεσμα των ψηφοφοριών με ονομαστική κλήση - Παράρτηµα 2 + MINUTESResult of roll-call votes - Annex 2 + ACTAResultados de las votaciones nominales - Anexo 2 + PROTOKOLLNimelise hääletuse tulemused - lisa 2 + PÖYTÄKIRJANimenhuutoäänestysten tulokset - Liite 2 + PROCÈS-VERBALRésultat des votes par appel nominal - Annexe 2 + MIONTUAIRISCÍTorthaí na vótála le glaoch rolla - Iarscríbhinn 2 + ZAPISNIKRezultat poimeničnog glasovanja - Prilog 2 + JEGYZŐKÖNYVA név szerinti szavazások eredménye - melléklet 2 + PROCESSO VERBALERisultato delle votazioni per appello nominale - Allegato 2 + PROTOKOLASVardinio balsavimo rezultatai - priedas 2 + PROTOKOLSRezultāti balsošanai pēc saraksta - pielikums 2 + MINUTIRiżultat tal-votazzjoni bis-sejħa tal-ismijiet - Anness 2 + NOTULENUitslag van de hoofdelijke stemmingen - Bijlage 2 + PROTOKÓŁWyniki głosowań imiennych - Załącznik 2 + ATAResultados das votações nominais - Anexo 2 + PROCES-VERBALRezultatul voturilor prin apel nominal - Anexa 2 + ZÁPISNICAVýsledok hlasovania podľa mien - Príloha 2 + ZAPISNIKIzid poimenskega glasovanja - Priloga 2 + PROTOKOLLResultat av omröstningarna med namnupprop - Bilaga 2 + + + A9-0163/2024 - Gabriele Bischoff - Article 10, § 6, alinéa 2 - Am 1 + + + Adamowicz + + + + + Amendments to Parliament’s Rules of Procedure concerning the training on preventing conflict and harassment in the workplace and on good office management + + + + BERICHTIGUNGEN DES STIMMVERHALTENS UND BEABSICHTIGTES STIMMVERHALTEN + RÄTTELSER/AVSIKTSFÖRKLARINGAR TILL AVGIVNA RÖSTER + ÄÄNESTYSKÄYTTÄYTYMISTÄ JA ÄÄNESTYSAIKEITA KOSKEVAT ILMOITUKSET + CORREÇÕES E INTENÇÕES DE VOTO + ПОПРАВКИ В ПОДАДЕНИТЕ ГЛАСОВЕ И НАМЕРЕНИЯ ЗА ГЛАСУВАНЕ + KORREZZJONIJIET U INTENZJONIJIET GĦALL-VOT + ΔΙΟΡΘΩΣΕΙΣ ΚΑΙ ΠΡΟΘΕΣΕΙΣ ΨΗΦΟΥ + BALSAVIMO PATAISYMAI IR KETINIMAI + CORRECTIONS TO VOTES AND VOTING INTENTIONS + BALSOJUMU LABOJUMI UN NODOMI BALSOT + IZMJENE DANIH GLASOVA I NAMJERE GLASAČA + CORREZIONI E INTENZIONI DI VOTO + CORRECTIONS ET INTENTIONS DE VOTE + SZAVAZATOK HELYESBÍTÉSEI ÉS SZAVAZÁSI SZÁNDÉKOK + CORRECCIONES E INTENCIONES DE VOTO + HÄÄLETUSE PARANDUSED JA HÄÄLETUSKAVATSUSED + OPRAVY HLASOVÁNÍ A SDĚLENÍ O ÚMYSLU HLASOVAT + OPRAVY HLASOVANIA A ZÁMERY PRI HLASOVANÍ + POPRAVKI IN NAMERE GLASOVANJA + CEARTÚCHÁIN AR AN VÓTA AGUS INTINNÍ VÓTÁLA + KOREKTY GŁOSOWANIA I ZAMIAR GŁOSOWANIA + CORECTĂRI ŞI INTENŢII DE VOT + STEMMERETTELSER OG -INTENTIONER + RECTIFICATIES STEMGEDRAG/ VOORGENOMEN STEMGEDRAG + + + diff --git a/backend/tests/pipelines/test_rcv_list.py b/backend/tests/pipelines/test_rcv_list.py index 371e1fa9d..55e78358a 100644 --- a/backend/tests/pipelines/test_rcv_list.py +++ b/backend/tests/pipelines/test_rcv_list.py @@ -1,12 +1,84 @@ import datetime import pytest +from sqlalchemy import select -from howtheyvote.pipelines import DataUnavailableError, RCVListPipeline +from howtheyvote.models import Group, GroupMembership, Member, PipelineStatus, Vote +from howtheyvote.pipelines import RCVListPipeline + +from ..helpers import load_fixture @pytest.mark.always_mock_requests def test_run_source_not_available(responses, db_session): - with pytest.raises(DataUnavailableError): - pipe = RCVListPipeline(term=9, date=datetime.date(2024, 4, 10)) - pipe.run() + pipe = RCVListPipeline(term=9, date=datetime.date(2024, 4, 10)) + result = pipe.run() + assert result.status == PipelineStatus.DATA_UNAVAILABLE + + +def test_run_data_unchanged(responses, db_session): + responses.get( + "https://www.europarl.europa.eu/doceo/document/PV-9-2024-04-24-RCV_FR.xml", + body=load_fixture("pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml"), + ) + + member = Member( + id=197490, + first_name="Magdalena", + last_name="ADAMOWICZ", + group_memberships=[ + GroupMembership( + term=9, + start_date=datetime.datetime(2019, 7, 2), + end_date=datetime.datetime(2024, 7, 15), + group=Group["EPP"], + ), + ], + ) + db_session.add(member) + db_session.commit() + + # Run the pipeline for the first time + pipe = RCVListPipeline( + term=9, + date=datetime.date(2024, 4, 24), + ) + result = pipe.run() + assert result.status == PipelineStatus.SUCCESS + assert ( + result.checksum == "c01379e8e00e9d8e60c71eebf90941c3318be9751a911d4f08b24aa9d0be26af" + ) + + vote_ids = list(db_session.execute(select(Vote.id)).scalars()) + assert vote_ids == [168834] + + # Run the pipeline again and provide the checksum of the first run + pipe = RCVListPipeline( + term=9, + date=datetime.date(2024, 4, 24), + last_run_checksum="c01379e8e00e9d8e60c71eebf90941c3318be9751a911d4f08b24aa9d0be26af", + ) + result = pipe.run() + assert result.status == PipelineStatus.DATA_UNCHANGED + assert result.checksum is None + + # Simulate that the source data has been updated in the meantime + responses.get( + "https://www.europarl.europa.eu/doceo/document/PV-9-2024-04-24-RCV_FR.xml", + body=load_fixture("pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-evening.xml"), + ) + + # Run the pipeline again and provide the checksum of the first run + pipe = RCVListPipeline( + term=9, + date=datetime.date(2024, 4, 24), + last_run_checksum="c01379e8e00e9d8e60c71eebf90941c3318be9751a911d4f08b24aa9d0be26af", + ) + result = pipe.run() + assert result.status == PipelineStatus.SUCCESS + assert ( + result.checksum == "743bf734045d7c797afea9e8c1127c047a4924bcd5090883ff8a74421376d511" + ) + + vote_ids = list(db_session.execute(select(Vote.id)).scalars()) + assert vote_ids == [168834, 168864] diff --git a/backend/tests/scrapers/helpers.py b/backend/tests/scrapers/helpers.py deleted file mode 100644 index c8be91285..000000000 --- a/backend/tests/scrapers/helpers.py +++ /dev/null @@ -1,7 +0,0 @@ -from pathlib import Path - -FIXTURES_BASE = Path(__file__).resolve().parent / "data" - - -def load_fixture(path: str) -> str: - return FIXTURES_BASE.joinpath(path).read_text() diff --git a/backend/tests/scrapers/test_members.py b/backend/tests/scrapers/test_members.py index 1ef309149..607ea5bd1 100644 --- a/backend/tests/scrapers/test_members.py +++ b/backend/tests/scrapers/test_members.py @@ -1,17 +1,14 @@ from datetime import date -from pathlib import Path from howtheyvote.scrapers import MemberGroupsScraper, MemberInfoScraper, MembersScraper -from .helpers import load_fixture - -TEST_DATA_DIR = Path(__file__).resolve().parent / "data" +from ..helpers import load_fixture def test_members_scraper(responses): responses.get( "https://www.europarl.europa.eu/meps/en/directory/xml/?leg=9", - body=load_fixture("members/members_directory_term_9.xml"), + body=load_fixture("scrapers/data/members/members_directory_term_9.xml"), ) scraper = MembersScraper(term=9) @@ -27,7 +24,7 @@ def test_members_scraper(responses): def test_member_info_scraper(responses): responses.get( "https://www.europarl.europa.eu/meps/en/124834/NAME/home", - body=load_fixture("members/member_info_sonneborn_home.html"), + body=load_fixture("scrapers/data/members/member_info_sonneborn_home.html"), ) scraper = MemberInfoScraper(web_id=124834) @@ -48,7 +45,7 @@ def test_member_info_scraper(responses): def test_member_info_scraper_date_of_birth_without(responses): responses.get( "https://www.europarl.europa.eu/meps/en/124831/NAME/home", - body=load_fixture("members/member_info_adinolfi_home.html"), + body=load_fixture("scrapers/data/members/member_info_adinolfi_home.html"), ) scraper = MemberInfoScraper(web_id=124831) @@ -59,7 +56,7 @@ def test_member_info_scraper_date_of_birth_without(responses): def test_member_info_scraper_multiple_emails(responses): responses.get( "https://www.europarl.europa.eu/meps/en/28229/NAME/home", - body=load_fixture("members/member_info_weber_home.html"), + body=load_fixture("scrapers/data/members/member_info_weber_home.html"), ) scraper = MemberInfoScraper(web_id=28229) @@ -70,7 +67,7 @@ def test_member_info_scraper_multiple_emails(responses): def test_member_info_scraper_no_social_media(responses): responses.get( "https://www.europarl.europa.eu/meps/en/124831/NAME/home", - body=load_fixture("members/member_info_adinolfi_home.html"), + body=load_fixture("scrapers/data/members/member_info_adinolfi_home.html"), ) scraper = MemberInfoScraper(web_id=124831) @@ -81,7 +78,7 @@ def test_member_info_scraper_no_social_media(responses): def test_member_groups_scraper(responses): responses.get( "https://www.europarl.europa.eu/meps/en/124831/NAME/history/8", - body=load_fixture("members/member_groups_adinolfi_term_8.html"), + body=load_fixture("scrapers/data/members/member_groups_adinolfi_term_8.html"), ) scraper = MemberGroupsScraper(web_id=124831, term=8) @@ -114,7 +111,7 @@ def test_member_groups_scraper(responses): def test_member_groups_scraper_ongoing(responses): responses.get( "https://www.europarl.europa.eu/meps/en/28229/NAME/history/10", - body=load_fixture("members/member_groups_weber_term_10.html"), + body=load_fixture("scrapers/data/members/member_groups_weber_term_10.html"), ) scraper = MemberGroupsScraper(web_id=28229, term=10) diff --git a/backend/tests/scrapers/test_sessions.py b/backend/tests/scrapers/test_sessions.py index dbc5a567e..2cc305e76 100644 --- a/backend/tests/scrapers/test_sessions.py +++ b/backend/tests/scrapers/test_sessions.py @@ -3,13 +3,13 @@ from howtheyvote.models import PlenarySessionLocation from howtheyvote.scrapers import ODPSessionScraper -from .helpers import load_fixture +from ..helpers import load_fixture def test_odp_session_scraper(responses): responses.get( "https://data.europarl.europa.eu/api/v1/meetings/MTG-PL-2024-07-16", - body=load_fixture("sessions/odp_mtg-pl-2024-07-16.xml"), + body=load_fixture("scrapers/data/sessions/odp_mtg-pl-2024-07-16.xml"), ) scraper = ODPSessionScraper(start_date=datetime.date(2024, 7, 16)) diff --git a/backend/tests/scrapers/test_votes.py b/backend/tests/scrapers/test_votes.py index d66ba5058..c2b21b9ad 100644 --- a/backend/tests/scrapers/test_votes.py +++ b/backend/tests/scrapers/test_votes.py @@ -11,15 +11,14 @@ RCVListScraper, ) -from ..helpers import record_to_dict -from .helpers import load_fixture +from ..helpers import load_fixture, record_to_dict @pytest.mark.always_mock_requests def test_rcv_list_scraper(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2020-07-23-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2020-07-23-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2020-07-23-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -65,7 +64,7 @@ def test_rcv_list_scraper(responses): def test_rcv_list_scraper_incorrect_totals(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2020-07-23-RCV_FR.xml", - body=load_fixture("votes/rcv_list_incorrect_pv-9-2020-07-23-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_incorrect_pv-9-2020-07-23-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -88,7 +87,7 @@ def test_rcv_list_scraper_incorrect_totals(responses): def test_rcv_list_scraper_did_not_vote(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2020-07-23-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2020-07-23-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2020-07-23-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -118,7 +117,7 @@ def test_rcv_list_scraper_did_not_vote(responses): def test_rcv_list_scraper_same_name(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2020-09-15-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2020-09-15-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2020-09-15-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -144,7 +143,7 @@ def test_rcv_list_scraper_same_name(responses): def test_rcv_list_scraper_pers_id(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2023-12-12-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2023-12-12-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2023-12-12-rcv-fr.xml"), ) # The voting list has a different spelling ("Glueck" instead of "Glück"). Cases like this @@ -171,7 +170,7 @@ def test_rcv_list_scraper_pers_id(responses): def test_rcv_list_scraper_pers_id_unknown(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2023-12-12-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2023-12-12-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2023-12-12-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -192,7 +191,7 @@ def test_rcv_list_scraper_document_register(responses): ) register_mock = responses.get( "https://www.europarl.europa.eu/RegData/seance_pleniere/proces_verbal/2020/07-23/liste_presence/P9_PV(2020)07-23(RCV)_XC.xml", - body=load_fixture("votes/rcv_list_p9-pv(2020)07-23(rcv)_xc.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_p9-pv(2020)07-23(rcv)_xc.xml"), ) scraper = RCVListScraper( @@ -216,7 +215,7 @@ def test_rcv_list_scraper_document_register(responses): def test_rcv_list_scraper_timestamp_from_text(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2019-07-15-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2019-07-15-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2019-07-15-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -234,7 +233,7 @@ def test_rcv_list_scraper_timestamp_from_text(responses): def test_procedure_scraper(responses): responses.get( "https://oeil.secure.europarl.europa.eu/oeil/en/procedure-file?reference=2023/2019(INI)", - body=load_fixture("votes/oeil-procedure-file_2023-2019-ini.html"), + body=load_fixture("scrapers/data/votes/oeil-procedure-file_2023-2019-ini.html"), ) scraper = ProcedureScraper(vote_id=162214, procedure_reference="2023/2019(INI)") @@ -258,7 +257,7 @@ def test_procedure_scraper(responses): def test_procedure_scraper_geo_areas(responses): responses.get( "https://oeil.secure.europarl.europa.eu/oeil/en/procedure-file?reference=2022/2852(RSP)", - body=load_fixture("votes/oeil-procedure-file_2022-2852-rsp.html"), + body=load_fixture("scrapers/data/votes/oeil-procedure-file_2022-2852-rsp.html"), ) scraper = ProcedureScraper(vote_id=149218, procedure_reference="2022/2852(RSP)") @@ -269,7 +268,7 @@ def test_procedure_scraper_geo_areas(responses): def test_procedure_scraper_geo_areas_fuzzy(responses): responses.get( "https://oeil.secure.europarl.europa.eu/oeil/en/procedure-file?reference=2022/2201(INI)", - body=load_fixture("votes/oeil-procedure-file_2022-2201-ini.html"), + body=load_fixture("scrapers/data/votes/oeil-procedure-file_2022-2201-ini.html"), ) scraper = ProcedureScraper(vote_id=155056, procedure_reference="2022/2201(INI)") @@ -280,7 +279,7 @@ def test_procedure_scraper_geo_areas_fuzzy(responses): def test_eurlex_procedure_scraper_eurovoc_concepts(responses): responses.get( "https://eur-lex.europa.eu/procedure/EN/2021_106", - body=load_fixture("votes/eurlex-procedure_2021-106.html"), + body=load_fixture("scrapers/data/votes/eurlex-procedure_2021-106.html"), ) scraper = EurlexProcedureScraper(vote_id=166051, procedure_reference="2021/0106(COD)") @@ -304,7 +303,7 @@ def test_eurlex_procedure_scraper_eurovoc_concepts(responses): def test_eurlex_procedure_scraper_geo_areas(responses): responses.get( "https://eur-lex.europa.eu/procedure/EN/2023_102", - body=load_fixture("votes/eurlex-procedure_2023-102.html"), + body=load_fixture("scrapers/data/votes/eurlex-procedure_2023-102.html"), ) scraper = EurlexProcedureScraper(vote_id=161383, procedure_reference="2023/0102(NLE)") @@ -326,7 +325,7 @@ def test_eurlex_procedure_scraper_geo_areas(responses): def test_eurlex_document_scraper_eurovoc_concepts(responses): responses.get( "https://eur-lex.europa.eu/legal-content/EN/ALL/?uri=EP:P9_A(2021)0270", - body=load_fixture("votes/eurlex-document_p9-a-2021-0270.html"), + body=load_fixture("scrapers/data/votes/eurlex-document_p9-a-2021-0270.html"), ) scraper = EurlexDocumentScraper(vote_id=136238, reference="A9-0270/2021") @@ -348,7 +347,7 @@ def test_eurlex_document_scraper_eurovoc_concepts(responses): def test_eurlex_document_scraper_geo_areas(responses): responses.get( "https://eur-lex.europa.eu/legal-content/EN/ALL/?uri=EP:P9_A(2023)0369", - body=load_fixture("votes/eurlex-document_p9-a-2023-0369.html"), + body=load_fixture("scrapers/data/votes/eurlex-document_p9-a-2023-0369.html"), ) scraper = EurlexDocumentScraper(vote_id=136238, reference="A9-0369/2023") diff --git a/backend/tests/worker/test_worker.py b/backend/tests/worker/test_worker.py index 3943ae2a9..e6cc59009 100644 --- a/backend/tests/worker/test_worker.py +++ b/backend/tests/worker/test_worker.py @@ -3,9 +3,14 @@ import time_machine from sqlalchemy import select -from howtheyvote.models import PipelineRun, PipelineRunResult -from howtheyvote.pipelines import DataUnavailableError, PipelineError -from howtheyvote.worker.worker import Weekday, Worker +from howtheyvote.models import PipelineRun, PipelineStatus +from howtheyvote.pipelines import PipelineResult +from howtheyvote.worker.worker import ( + Weekday, + Worker, + last_pipeline_run_checksum, + pipeline_ran_successfully, +) def get_handler(): @@ -114,48 +119,58 @@ def test_worker_schedule_timezone_dst(db_session): def test_worker_schedule_pipeline_log_runs(db_session): worker = Worker() - handler = get_handler() + + def pipeline_handler(): + return PipelineResult( + status=PipelineStatus.SUCCESS, + checksum="123abc", + ) with time_machine.travel(datetime.datetime(2024, 1, 1, 0, 0)): - worker.schedule_pipeline(handler, name="test", hours={10}) + worker.schedule_pipeline(pipeline_handler, name="test", hours={10}) worker.run_pending() - assert handler.calls == 0 runs = list(db_session.execute(select(PipelineRun)).scalars()) assert len(runs) == 0 with time_machine.travel(datetime.datetime(2024, 1, 1, 10, 0)): worker.run_pending() - assert handler.calls == 1 runs = list(db_session.execute(select(PipelineRun)).scalars()) assert len(runs) == 1 run = runs[0] assert run.pipeline == "test" - assert run.result == PipelineRunResult.SUCCESS + assert run.status == PipelineStatus.SUCCESS + assert run.checksum == "123abc" assert run.started_at.date() == datetime.date(2024, 1, 1) assert run.finished_at.date() == datetime.date(2024, 1, 1) -def test_worker_schedule_pipeline_log_runs_exceptions(db_session): +def test_worker_schedule_pipeline_log_runs_status(db_session): worker = Worker() - def data_unavailable_error(): - raise DataUnavailableError() + def data_unavailable(): + return PipelineResult( + status=PipelineStatus.DATA_UNAVAILABLE, + checksum=None, + ) - def pipeline_error(): - raise PipelineError() + def failure(): + return PipelineResult( + status=PipelineStatus.FAILURE, + checksum=None, + ) with time_machine.travel(datetime.datetime(2024, 1, 1, 0, 0)): worker.schedule_pipeline( - data_unavailable_error, - name="data_unavailable_error", + data_unavailable, + name="data_unavailable", hours={10}, ) worker.schedule_pipeline( - pipeline_error, - name="pipeline_error", + failure, + name="failure", hours={10}, ) @@ -165,8 +180,120 @@ def pipeline_error(): runs = list(db_session.execute(select(PipelineRun)).scalars()) assert len(runs) == 2 - assert runs[0].pipeline == "data_unavailable_error" - assert runs[0].result == PipelineRunResult.DATA_UNAVAILABLE + assert runs[0].pipeline == "data_unavailable" + assert runs[0].status == PipelineStatus.DATA_UNAVAILABLE + + assert runs[1].pipeline == "failure" + assert runs[1].status == PipelineStatus.FAILURE - assert runs[1].pipeline == "pipeline_error" - assert runs[1].result == PipelineRunResult.FAILURE + +def test_worker_schedule_pipeline_unhandled_exceptions(db_session): + worker = Worker() + + def woops(): + raise Exception() + + with time_machine.travel(datetime.datetime(2024, 1, 1, 0, 0)): + worker.schedule_pipeline(woops, name="woops", hours={10}) + + with time_machine.travel(datetime.datetime(2024, 1, 1, 10, 0)): + worker.run_pending() + + runs = list(db_session.execute(select(PipelineRun)).scalars()) + assert len(runs) == 1 + assert runs[0].pipeline == "woops" + assert runs[0].status == PipelineStatus.FAILURE + + +def test_pipeline_ran_successfully(db_session): + class TestPipeline: + pass + + now = datetime.datetime.now() + today = now.date() + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + status=PipelineStatus.FAILURE, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today) is False + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + status=PipelineStatus.SUCCESS, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today) is True + assert pipeline_ran_successfully(TestPipeline, today, count=2) is False + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + status=PipelineStatus.SUCCESS, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today, count=2) is True + + +def test_last_pipeline_run_checksum(db_session): + class TestPipeline: + pass + + with time_machine.travel(datetime.datetime(2024, 1, 1, 0, 0)): + checksum = last_pipeline_run_checksum( + pipeline=TestPipeline, + date=datetime.date(2024, 1, 1), + ) + assert checksum is None + + run = PipelineRun( + started_at=datetime.datetime(2024, 1, 1, 0, 0, 0), + finished_at=datetime.datetime(2024, 1, 1, 0, 0, 0), + pipeline=TestPipeline.__name__, + status=PipelineStatus.SUCCESS, + checksum="123abc", + ) + db_session.add(run) + db_session.commit() + + with time_machine.travel(datetime.datetime(2024, 1, 1, 1, 0, 0)): + checksum = last_pipeline_run_checksum( + pipeline=TestPipeline, + date=datetime.date(2024, 1, 1), + ) + assert checksum == "123abc" + + checksum = last_pipeline_run_checksum( + pipeline=TestPipeline, + date=datetime.date(2024, 1, 2), + ) + assert checksum is None + + run = PipelineRun( + started_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + finished_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + pipeline=TestPipeline.__name__, + status=PipelineStatus.SUCCESS, + checksum="456def", + ) + db_session.add(run) + db_session.commit() + + with time_machine.travel(datetime.datetime(2024, 1, 1, 13, 0, 0)): + checksum = last_pipeline_run_checksum( + pipeline=TestPipeline, + date=datetime.date(2024, 1, 1), + ) + assert checksum == "456def"