Skip to content

Commit

Permalink
Rename PipelineRunResult to PipelineStatus
Browse files Browse the repository at this point in the history
In preparation for changes in the next commit to avoid ambiguous naming
  • Loading branch information
tillprochaska committed Dec 8, 2024
1 parent 89de9ab commit a7589a2
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Rename result column to status
Revision ID: 1f516b18c4f6
Revises: 9b35d19b64c4
Create Date: 2024-12-08 11:25:26.051408
"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "1f516b18c4f6"
down_revision = "9b35d19b64c4"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.alter_column("pipeline_runs", column_name="result", new_column_name="status")


def downgrade() -> None:
op.alter_column("pipeline_runs", column_name="status", new_column_name="result")
4 changes: 2 additions & 2 deletions backend/howtheyvote/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .common import Base, BaseWithId, DataIssue, Fragment, PipelineRun, PipelineRunResult
from .common import Base, BaseWithId, DataIssue, Fragment, PipelineRun, PipelineStatus
from .country import Country, CountryType
from .eurovoc import EurovocConcept, EurovocConceptType
from .group import Group
Expand All @@ -24,7 +24,7 @@
"BaseWithId",
"Fragment",
"PipelineRun",
"PipelineRunResult",
"PipelineStatus",
"DataIssue",
"Country",
"CountryType",
Expand Down
5 changes: 3 additions & 2 deletions backend/howtheyvote/models/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ class DataIssue(Enum):
VOTE_GROUP_NO_MAIN_VOTE = "VOTE_GROUP_NO_MAIN_VOTE"


class PipelineRunResult(Enum):
class PipelineStatus(Enum):
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
DATA_UNAVAILABLE = "DATA_UNAVAILABLE"
DATA_UNCHANGED = "DATA_UNCHANGED"


class PipelineRun(Base):
Expand All @@ -49,4 +50,4 @@ class PipelineRun(Base):
started_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime)
finished_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime)
pipeline: Mapped[str] = mapped_column(sa.Unicode)
result: Mapped[PipelineRunResult] = mapped_column(sa.Enum(PipelineRunResult))
status: Mapped[PipelineStatus] = mapped_column(sa.Enum(PipelineStatus))
2 changes: 1 addition & 1 deletion backend/howtheyvote/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ..db import Session
from ..export import generate_export
from ..files import file_path
from ..models import PipelineRun, PipelineRunResult, PlenarySession
from ..models import PipelineRun, PlenarySession
from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline
from ..query import session_is_current_at
from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully
Expand Down
22 changes: 11 additions & 11 deletions backend/howtheyvote/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from .. import config
from ..db import Session
from ..models import PipelineRun, PipelineRunResult
from ..models import PipelineRun, PipelineStatus
from ..pipelines import DataUnavailableError

log = get_logger(__name__)
Expand All @@ -26,13 +26,13 @@
PIPELINE_RUN_DURATION = Histogram(
"htv_worker_pipeline_run_duration_seconds",
"Duration of pipeline runs executed by the worker",
["pipeline", "result"],
["pipeline", "status"],
)

PIPELINE_RUNS = Counter(
"htv_worker_pipeline_runs_total",
"Total number of pipeline runs executed by the worker",
["pipeline", "result"],
["pipeline", "status"],
)

PIPELINE_NEXT_RUN = Gauge(
Expand Down Expand Up @@ -70,7 +70,7 @@ def pipeline_ran_successfully(
.select_from(PipelineRun)
.where(PipelineRun.pipeline == pipeline.__name__)
.where(func.date(PipelineRun.started_at) == func.date(date))
.where(PipelineRun.result == PipelineRunResult.SUCCESS)
.where(PipelineRun.status == PipelineStatus.SUCCESS)
)
result = Session.execute(query).scalar() or 0

Expand All @@ -79,7 +79,7 @@ def pipeline_ran_successfully(

class Worker:
"""Running a worker starts a long-running process that executes data pipelines in regular
intervals and stores the result of the pipeline runs in the database."""
intervals and stores the status of the pipeline runs in the database."""

def __init__(self) -> None:
self._scheduler = Scheduler()
Expand Down Expand Up @@ -165,27 +165,27 @@ def wrapped_handler() -> None:

try:
handler()
result = PipelineRunResult.SUCCESS
status = PipelineStatus.SUCCESS
except SkipPipelineError:
# Do not log skipped pipeline runs
return
except DataUnavailableError:
result = PipelineRunResult.DATA_UNAVAILABLE
status = PipelineStatus.DATA_UNAVAILABLE
except Exception:
result = PipelineRunResult.FAILURE
status = PipelineStatus.FAILURE

duration = time.time() - start_time
finished_at = datetime.datetime.now(datetime.UTC)

labels = {"pipeline": name, "result": result.value}
labels = {"pipeline": name, "status": status.value}
PIPELINE_RUNS.labels(**labels).inc()
PIPELINE_RUN_DURATION.labels(**labels).observe(duration)

run = PipelineRun(
pipeline=name,
started_at=started_at,
finished_at=finished_at,
result=result.value,
status=status,
)

Session.add(run)
Expand All @@ -198,7 +198,7 @@ def wrapped_handler() -> None:
started_at=started_at.isoformat(),
finished_at=finished_at.isoformat(),
duration=duration,
result=result.value,
status=status.value,
)

Session.remove()
Expand Down
14 changes: 7 additions & 7 deletions backend/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time_machine
from sqlalchemy import select

from howtheyvote.models import PipelineRun, PipelineRunResult
from howtheyvote.models import PipelineRun, PipelineStatus
from howtheyvote.pipelines import DataUnavailableError, PipelineError
from howtheyvote.worker.worker import Weekday, Worker, pipeline_ran_successfully

Expand Down Expand Up @@ -133,7 +133,7 @@ def test_worker_schedule_pipeline_log_runs(db_session):

run = runs[0]
assert run.pipeline == "test"
assert run.result == PipelineRunResult.SUCCESS
assert run.status == PipelineStatus.SUCCESS
assert run.started_at.date() == datetime.date(2024, 1, 1)
assert run.finished_at.date() == datetime.date(2024, 1, 1)

Expand Down Expand Up @@ -166,10 +166,10 @@ def pipeline_error():
assert len(runs) == 2

assert runs[0].pipeline == "data_unavailable_error"
assert runs[0].result == PipelineRunResult.DATA_UNAVAILABLE
assert runs[0].status == PipelineStatus.DATA_UNAVAILABLE

assert runs[1].pipeline == "pipeline_error"
assert runs[1].result == PipelineRunResult.FAILURE
assert runs[1].status == PipelineStatus.FAILURE


def test_pipeline_ran_successfully(db_session):
Expand All @@ -183,7 +183,7 @@ class TestPipeline:
started_at=now,
finished_at=now,
pipeline=TestPipeline.__name__,
result=PipelineRunResult.FAILURE,
status=PipelineStatus.FAILURE,
)
db_session.add(run)
db_session.commit()
Expand All @@ -194,7 +194,7 @@ class TestPipeline:
started_at=now,
finished_at=now,
pipeline=TestPipeline.__name__,
result=PipelineRunResult.SUCCESS,
status=PipelineStatus.SUCCESS,
)
db_session.add(run)
db_session.commit()
Expand All @@ -206,7 +206,7 @@ class TestPipeline:
started_at=now,
finished_at=now,
pipeline=TestPipeline.__name__,
result=PipelineRunResult.SUCCESS,
status=PipelineStatus.SUCCESS,
)
db_session.add(run)
db_session.commit()
Expand Down

0 comments on commit a7589a2

Please sign in to comment.