diff --git a/backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py b/backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py new file mode 100644 index 00000000..c7beec80 --- /dev/null +++ b/backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py @@ -0,0 +1,24 @@ +"""Add checksum column to pipeline_runs table + +Revision ID: 2f958a6f147d +Revises: 9b35d19b64c4 +Create Date: 2024-12-07 17:12:10.792707 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "2f958a6f147d" +down_revision = "9b35d19b64c4" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column("pipeline_runs", sa.Column("checksum", sa.Unicode)) + + +def downgrade() -> None: + op.drop_column("pipeline_runs", "checksum") diff --git a/backend/howtheyvote/models/common.py b/backend/howtheyvote/models/common.py index ef084442..7d1e1ad8 100644 --- a/backend/howtheyvote/models/common.py +++ b/backend/howtheyvote/models/common.py @@ -40,6 +40,7 @@ class PipelineRunResult(Enum): SUCCESS = "SUCCESS" FAILURE = "FAILURE" DATA_UNAVAILABLE = "DATA_UNAVAILABLE" + DATA_UNCHANGED = "DATA_UNCHANGED" class PipelineRun(Base): @@ -50,3 +51,4 @@ class PipelineRun(Base): finished_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime) pipeline: Mapped[str] = mapped_column(sa.Unicode) result: Mapped[PipelineRunResult] = mapped_column(sa.Enum(PipelineRunResult)) + checksum: Mapped[str] = mapped_column(sa.Unicode) diff --git a/backend/howtheyvote/worker/__init__.py b/backend/howtheyvote/worker/__init__.py index 36ac5f28..07e8c386 100644 --- a/backend/howtheyvote/worker/__init__.py +++ b/backend/howtheyvote/worker/__init__.py @@ -10,14 +10,14 @@ from ..export import generate_export from ..files import file_path from ..models import PipelineRun, PipelineRunResult, PlenarySession -from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline +from ..pipelines import BasePipeline, MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline from ..query import session_is_current_at from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully log = get_logger(__name__) -def op_rcv_midday() -> None: +def op_rcv_midday() -> BasePipeline: """Checks if there is a current plenary session and, if yes, fetches the latest roll-call vote results.""" today = datetime.date.today() @@ -31,8 +31,10 @@ def op_rcv_midday() -> None: pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) pipeline.run() + return pipeline -def op_rcv_evening() -> None: + +def op_rcv_evening() -> BasePipeline: """While on most plenary days, there’s only one voting session around midday, on some days there is another sesssion in the evening, usually around 17:00. The vote results of the evening sessions are appended to the same source document that also contains the results @@ -49,8 +51,10 @@ def op_rcv_evening() -> None: pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) pipeline.run() + return pipeline + -def op_press() -> None: +def op_press() -> BasePipeline: """Checks if there is a current plenary session and, if yes, fetches the latest press releases from the Parliament’s news hub.""" today = datetime.date.today() @@ -61,18 +65,24 @@ def op_press() -> None: pipeline = PressPipeline(date=today, with_rss=True) pipeline.run() + return pipeline + -def op_sessions() -> None: +def op_sessions() -> BasePipeline: """Fetches plenary session dates.""" pipeline = SessionsPipeline(term=config.CURRENT_TERM) pipeline.run() + return pipeline -def op_members() -> None: + +def op_members() -> BasePipeline: """Fetches information about all members of the current term.""" pipeline = MembersPipeline(term=config.CURRENT_TERM) pipeline.run() + return pipeline + EXPORT_LAST_RUN = Gauge( "htv_export_last_run_timestamp_seconds", diff --git a/backend/howtheyvote/worker/worker.py b/backend/howtheyvote/worker/worker.py index 91667c61..edf3bfee 100644 --- a/backend/howtheyvote/worker/worker.py +++ b/backend/howtheyvote/worker/worker.py @@ -15,7 +15,7 @@ from .. import config from ..db import Session from ..models import PipelineRun, PipelineRunResult -from ..pipelines import DataUnavailableError +from ..pipelines import BasePipeline, DataUnavailableError, DataUnchangedError log = get_logger(__name__) @@ -150,7 +150,7 @@ def schedule( def schedule_pipeline( self, - handler: Handler, + handler: Callable[..., BasePipeline], name: str, weekdays: Iterable[Weekday] = set(Weekday), hours: Iterable[int] = {0}, @@ -164,13 +164,15 @@ def wrapped_handler() -> None: started_at = datetime.datetime.now(datetime.UTC) try: - handler() + pipeline = handler() result = PipelineRunResult.SUCCESS except SkipPipelineError: # Do not log skipped pipeline runs return except DataUnavailableError: result = PipelineRunResult.DATA_UNAVAILABLE + except DataUnchangedError: + result = PipelineRunResult.DATA_UNCHANGED except Exception: result = PipelineRunResult.FAILURE @@ -186,6 +188,7 @@ def wrapped_handler() -> None: started_at=started_at, finished_at=finished_at, result=result.value, + checksum=pipeline.checksum, ) Session.add(run)