Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
tillprochaska committed Dec 7, 2024
1 parent 89de9ab commit d0a9c05
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add checksum column to pipeline_runs table
Revision ID: 2f958a6f147d
Revises: 9b35d19b64c4
Create Date: 2024-12-07 17:12:10.792707
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "2f958a6f147d"
down_revision = "9b35d19b64c4"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column("pipeline_runs", sa.Column("checksum", sa.Unicode))


def downgrade() -> None:
op.drop_column("pipeline_runs", "checksum")
2 changes: 2 additions & 0 deletions backend/howtheyvote/models/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class PipelineRunResult(Enum):
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
DATA_UNAVAILABLE = "DATA_UNAVAILABLE"
DATA_UNCHANGED = "DATA_UNCHANGED"


class PipelineRun(Base):
Expand All @@ -50,3 +51,4 @@ class PipelineRun(Base):
finished_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime)
pipeline: Mapped[str] = mapped_column(sa.Unicode)
result: Mapped[PipelineRunResult] = mapped_column(sa.Enum(PipelineRunResult))
checksum: Mapped[str] = mapped_column(sa.Unicode)
22 changes: 16 additions & 6 deletions backend/howtheyvote/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
from ..export import generate_export
from ..files import file_path
from ..models import PipelineRun, PipelineRunResult, PlenarySession
from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline
from ..pipelines import BasePipeline, MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline
from ..query import session_is_current_at
from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully

log = get_logger(__name__)


def op_rcv_midday() -> None:
def op_rcv_midday() -> BasePipeline:
"""Checks if there is a current plenary session and, if yes, fetches the latest roll-call
vote results."""
today = datetime.date.today()
Expand All @@ -31,8 +31,10 @@ def op_rcv_midday() -> None:
pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today)
pipeline.run()

return pipeline

def op_rcv_evening() -> None:

def op_rcv_evening() -> BasePipeline:
"""While on most plenary days, there’s only one voting session around midday, on some days
there is another sesssion in the evening, usually around 17:00. The vote results of the
evening sessions are appended to the same source document that also contains the results
Expand All @@ -49,8 +51,10 @@ def op_rcv_evening() -> None:
pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today)
pipeline.run()

return pipeline


def op_press() -> None:
def op_press() -> BasePipeline:
"""Checks if there is a current plenary session and, if yes, fetches the latest press
releases from the Parliament’s news hub."""
today = datetime.date.today()
Expand All @@ -61,18 +65,24 @@ def op_press() -> None:
pipeline = PressPipeline(date=today, with_rss=True)
pipeline.run()

return pipeline


def op_sessions() -> None:
def op_sessions() -> BasePipeline:
"""Fetches plenary session dates."""
pipeline = SessionsPipeline(term=config.CURRENT_TERM)
pipeline.run()

return pipeline

def op_members() -> None:

def op_members() -> BasePipeline:
"""Fetches information about all members of the current term."""
pipeline = MembersPipeline(term=config.CURRENT_TERM)
pipeline.run()

return pipeline


EXPORT_LAST_RUN = Gauge(
"htv_export_last_run_timestamp_seconds",
Expand Down
9 changes: 6 additions & 3 deletions backend/howtheyvote/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .. import config
from ..db import Session
from ..models import PipelineRun, PipelineRunResult
from ..pipelines import DataUnavailableError
from ..pipelines import BasePipeline, DataUnavailableError, DataUnchangedError

log = get_logger(__name__)

Expand Down Expand Up @@ -150,7 +150,7 @@ def schedule(

def schedule_pipeline(
self,
handler: Handler,
handler: Callable[..., BasePipeline],
name: str,
weekdays: Iterable[Weekday] = set(Weekday),
hours: Iterable[int] = {0},
Expand All @@ -164,13 +164,15 @@ def wrapped_handler() -> None:
started_at = datetime.datetime.now(datetime.UTC)

try:
handler()
pipeline = handler()
result = PipelineRunResult.SUCCESS
except SkipPipelineError:
# Do not log skipped pipeline runs
return
except DataUnavailableError:
result = PipelineRunResult.DATA_UNAVAILABLE
except DataUnchangedError:
result = PipelineRunResult.DATA_UNCHANGED
except Exception:
result = PipelineRunResult.FAILURE

Expand All @@ -186,6 +188,7 @@ def wrapped_handler() -> None:
started_at=started_at,
finished_at=finished_at,
result=result.value,
checksum=pipeline.checksum,
)

Session.add(run)
Expand Down

0 comments on commit d0a9c05

Please sign in to comment.