Skip to content

Commit

Permalink
new branch, save forecasts to csv + tests (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterdudfield authored Nov 11, 2024
1 parent 1c1726a commit b81e25b
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 102 deletions.
2 changes: 2 additions & 0 deletions database-cleanup/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

This service is responsible to remove old forecasts from the database and archive them.

Old forecast are saved to a directory defined by `SAVE_DIR` environment variable.

## Running the service

```bash
Expand Down
75 changes: 70 additions & 5 deletions database-cleanup/database_cleanup/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import os
import time
import uuid
import fsspec
from typing import Optional

import click
import importlib.metadata
import sentry_sdk
import sqlalchemy as sa
from pvsite_datamodel.sqlmodels import ForecastSQL, ForecastValueSQL
from sqlalchemy.orm import Session, sessionmaker
import pandas as pd

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,6 +65,34 @@ def _delete_forecasts_and_values(session: Session, forecast_uuids: list[uuid.UUI
session.execute(stmt)


def save_forecast_and_values(session: Session, forecast_uuids: list[uuid.UUID], directory: str):
"""
Save forecast and forecast values to csv
:param session: database session
:param forecast_uuids: list of forecast uuids
:param directory: the directory where they should be saved
"""
_log.info(f"Saving data to {directory}")

fs = fsspec.open(directory).fs
# check folder exists, if it doesnt, add it
if not fs.exists(directory):
fs.mkdir(directory)

# loop over both forecast and forecast_values tables
for table in ["forecast", "forecast_value"]:
model = ForecastSQL if table == "forecast" else ForecastValueSQL

# get data
query = session.query(model).where(model.forecast_uuid.in_(forecast_uuids))
forecasts_sql = query.all()
forecasts_df = pd.DataFrame([f.__dict__ for f in forecasts_sql])

# save to csv
_log.info(f"saving to {directory}, Saving {len(forecasts_df)} rows to {table}.csv")
forecasts_df.to_csv(f"{directory}/{table}.csv", index=False)


@click.command()
@click.option(
"--date",
Expand All @@ -76,6 +107,13 @@ def _delete_forecasts_and_values(session: Session, forecast_uuids: list[uuid.UUI
" (Note that this means orders of magnitude more Forecast *Values*).",
show_default=True,
)
@click.option(
"--save-dir",
default=None,
envvar="SAVE_DIR",
help="The directory where we save the delete forecasts and values.",
show_default=True,
)
@click.option(
"--sleep",
type=float,
Expand All @@ -91,21 +129,29 @@ def _delete_forecasts_and_values(session: Session, forecast_uuids: list[uuid.UUI
@click.option(
"--do-delete", is_flag=True, help="Actually delete the rows. By default we only do a dry run."
)
def main(date: dt.datetime, batch_size: int, sleep: int, do_delete: bool, log_level: str):
def main(
date: dt.datetime,
batch_size: int,
save_dir: Optional[str],
sleep: int,
do_delete: bool,
log_level: str,
):
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s",
)

if date is None:
date = (dt.date.today() - dt.timedelta(days=3)).strftime("%Y-%m-%d 00:00")
else:
date = dt.datetime.strptime(date, "%Y-%m-%d %H:%M")
date = format_date(date)

db_url = os.environ["DB_URL"]
engine = sa.create_engine(db_url)
Session = sessionmaker(engine, future=True)

if save_dir is not None:
save_dir = f"{save_dir}/{date.isoformat()}"
_log.info(f"Saving data to {save_dir}")

if do_delete:
_log.info(f"Deleting forecasts made before {date} (UTC).")
else:
Expand All @@ -121,6 +167,11 @@ def main(date: dt.datetime, batch_size: int, sleep: int, do_delete: bool, log_le
limit=batch_size,
)

if save_dir is not None:
save_forecast_and_values(
session=session, forecast_uuids=forecast_uuids, directory=save_dir
)

if len(forecast_uuids) == 0:
_log.info(f"Done deleting forecasts made before {date}")
_log.info(
Expand All @@ -146,5 +197,19 @@ def main(date: dt.datetime, batch_size: int, sleep: int, do_delete: bool, log_le
time.sleep(sleep)


def format_date(date) -> dt.datetime:
"""
Format the date to a datetime object
:param date: None, or string in the format "YYYY-MM-DD HH:mm"
:return:
"""
if date is None:
date = (dt.date.today() - dt.timedelta(days=3)).strftime("%Y-%m-%d 00:00")

date = dt.datetime.strptime(date, "%Y-%m-%d %H:%M")

return date


if __name__ == "__main__":
main()
Loading

0 comments on commit b81e25b

Please sign in to comment.