Skip to content

Commit

Permalink
saving results to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
peterdudfield committed Nov 11, 2024
1 parent 1c1726a commit 26c5fbf
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 44 deletions.
2 changes: 2 additions & 0 deletions database-cleanup/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

This service is responsible to remove old forecasts from the database and archive them.

Old forecast are saved to a directory defined by `SAVE_DIR` environment variable.

## Running the service

```bash
Expand Down
67 changes: 62 additions & 5 deletions database-cleanup/database_cleanup/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
import os
import time
import uuid
import fsspec

import click
import importlib.metadata
import sentry_sdk
import sqlalchemy as sa
from pvsite_datamodel.sqlmodels import ForecastSQL, ForecastValueSQL
from sqlalchemy.orm import Session, sessionmaker
import pandas as pd

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,6 +64,34 @@ def _delete_forecasts_and_values(session: Session, forecast_uuids: list[uuid.UUI
session.execute(stmt)


def save_forecast_and_values(session: Session, forecast_uuids: list[uuid.UUID], directory: str):
"""
Save forecast and forecast values to csv
:param session: database session
:param forecast_uuids: list of forecast uuids
:param directory: the directory where they should be saved
"""
_log.info(f"Saving data to {directory}")

fs = fsspec.open(directory).fs
# check folder exists, if it doesnt, add it
if not fs.exists(directory):
fs.mkdir(directory)

# loop over both forecast and forecast_values tables
for table in ["forecast", "forecast_value"]:
model = ForecastSQL if table == "forecast" else ForecastValueSQL

# get data
query = session.query(model).where(model.forecast_uuid.in_(forecast_uuids))
forecasts_df = pd.read_sql(query.statement, session.bind)

# save to csv
_log.info(f"saving to {directory}, Saving {len(forecasts_df)} rows to {table}.csv")
forecasts_df.to_csv(f"{directory}/{table}.csv", index=False)


@click.command()
@click.option(
"--date",
Expand All @@ -76,6 +106,13 @@ def _delete_forecasts_and_values(session: Session, forecast_uuids: list[uuid.UUI
" (Note that this means orders of magnitude more Forecast *Values*).",
show_default=True,
)
@click.option(
"--save-dir",
default="data",
envvar="SAVE_DIR",
help="The directory where we save the delete forecasts and values.",
show_default=True,
)
@click.option(
"--sleep",
type=float,
Expand All @@ -91,21 +128,23 @@ def _delete_forecasts_and_values(session: Session, forecast_uuids: list[uuid.UUI
@click.option(
"--do-delete", is_flag=True, help="Actually delete the rows. By default we only do a dry run."
)
def main(date: dt.datetime, batch_size: int, sleep: int, do_delete: bool, log_level: str):
def main(
date: dt.datetime, batch_size: int, save_dir: str, sleep: int, do_delete: bool, log_level: str
):
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s",
)

if date is None:
date = (dt.date.today() - dt.timedelta(days=3)).strftime("%Y-%m-%d 00:00")
else:
date = dt.datetime.strptime(date, "%Y-%m-%d %H:%M")
date = format_date(date)

db_url = os.environ["DB_URL"]
engine = sa.create_engine(db_url)
Session = sessionmaker(engine, future=True)

save_dir = f"{save_dir}/{date.isoformat()}"
_log.info(f"Saving data to {save_dir}")

if do_delete:
_log.info(f"Deleting forecasts made before {date} (UTC).")
else:
Expand All @@ -121,6 +160,8 @@ def main(date: dt.datetime, batch_size: int, sleep: int, do_delete: bool, log_le
limit=batch_size,
)

save_forecast_and_values(session=session, forecast_uuids=forecast_uuids, directory=save_dir)

if len(forecast_uuids) == 0:
_log.info(f"Done deleting forecasts made before {date}")
_log.info(
Expand All @@ -131,6 +172,7 @@ def main(date: dt.datetime, batch_size: int, sleep: int, do_delete: bool, log_le
return

if do_delete:

# Not that it is important to run this in a transaction for atomicity.
with Session.begin() as session:
_delete_forecasts_and_values(session, forecast_uuids)
Expand All @@ -146,5 +188,20 @@ def main(date: dt.datetime, batch_size: int, sleep: int, do_delete: bool, log_le
time.sleep(sleep)


def format_date(date) -> dt.datetime:
"""
Format the date to a datetime object
:param date: None, or string in the format "YYYY-MM-DD HH:mm"
:return:
"""
if date is None:
date = (dt.date.today() - dt.timedelta(days=3)).strftime("%Y-%m-%d 00:00")

date = dt.datetime.strptime(date, "%Y-%m-%d %H:%M")

return date


if __name__ == "__main__":
main()
94 changes: 55 additions & 39 deletions database-cleanup/tests/test_app.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime as dt
import os.path
import uuid
import tempfile

import pytest
import sqlalchemy as sa
from click.testing import CliRunner
from database_cleanup.app import main
from database_cleanup.app import main, format_date
from freezegun import freeze_time
from pvsite_datamodel.sqlmodels import ForecastSQL, ForecastValueSQL, SiteSQL
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -83,44 +85,58 @@ def test_app(session: Session, site, batch_size: int, date_str: str | None, expe
num_forecasts = 10
num_values = 9

timestamps = [dt.datetime(2020, 1, d + 1) for d in range(num_forecasts)]

# Add forecasts for those.
_add_foreasts(
session,
site_uuid=site_uuid,
timestamps=timestamps,
num_values=num_values,
frequency=1,
)

# Run the script.
args = ["--do-delete"]

if date_str is not None:
args.extend(["--date", date_str])

if batch_size is not None:
args.extend(["--batch-size", str(batch_size)])

_run_cli(main, args)

# Check that we have the right number of rows left.
# Only check for the site_uuid that we considered.
num_forecasts_left = session.scalars(
sa.select(sa.func.count())
.select_from(ForecastSQL)
.where(ForecastSQL.site_uuid == site_uuid)
).one()
assert num_forecasts_left == expected

num_values_left = session.scalars(
sa.select(sa.func.count())
.select_from(ForecastValueSQL)
.join(ForecastSQL)
.where(ForecastSQL.site_uuid == site_uuid)
).one()
assert num_values_left == expected * num_values
# make temp directory
with tempfile.TemporaryDirectory() as tmpdirname:

save_dir = tmpdirname

timestamps = [dt.datetime(2020, 1, d + 1) for d in range(num_forecasts)]

# Add forecasts for those.
_add_foreasts(
session,
site_uuid=site_uuid,
timestamps=timestamps,
num_values=num_values,
frequency=1,
)

# Run the script.
args = ["--do-delete"]

if date_str is not None:
args.extend(["--date", date_str])

if batch_size is not None:
args.extend(["--batch-size", str(batch_size)])

args.extend(["--save-dir", save_dir])

_run_cli(main, args)

# Check that we have the right number of rows left.
# Only check for the site_uuid that we considered.
num_forecasts_left = session.scalars(
sa.select(sa.func.count())
.select_from(ForecastSQL)
.where(ForecastSQL.site_uuid == site_uuid)
).one()
assert num_forecasts_left == expected

num_values_left = session.scalars(
sa.select(sa.func.count())
.select_from(ForecastValueSQL)
.join(ForecastSQL)
.where(ForecastSQL.site_uuid == site_uuid)
).one()
assert num_values_left == expected * num_values

# check that forecast.csv and forecast_values.csv are saved
date = format_date(date_str).isoformat()
assert os.path.exists(f"{tmpdirname}")
assert os.path.exists(f"{tmpdirname}/{date}")
assert os.path.exists(f"{tmpdirname}/{date}/forecast.csv")
assert os.path.exists(f"{tmpdirname}/{date}/forecast_value.csv")


@freeze_time("2020-01-11 00:01")
Expand Down

0 comments on commit 26c5fbf

Please sign in to comment.