Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into wip/api-v1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
cityofcapetown-opm-bot committed Dec 17, 2024
2 parents 17e63cf + 771c96f commit c187fb3
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 20 deletions.
16 changes: 8 additions & 8 deletions src/cct_connector/ServiceAlertAugmenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@


@functools.lru_cache
def _load_gis_layer(area_type: str, layer_query: str | None = None):
def _load_gis_layer(area_type: str, layer_query: str or None = None):
if area_type in AREA_LOOKUP:
layer_name, _ = AREA_LOOKUP[area_type]
else:
Expand Down Expand Up @@ -117,7 +117,7 @@ def _get_overture_street_data() -> geopandas.GeoDataFrame:


def _geocode_location(address: str,
bounding_polygon: shapely.geometry.base) -> shapely.geometry.base | None:
bounding_polygon: shapely.geometry.base) -> shapely.geometry.base or None:
output_shape = None
logging.debug(f"Attempting to geocode '{address=}'")

Expand Down Expand Up @@ -198,7 +198,7 @@ def _geocode_location(address: str,
return output_shape


def _cptgpt_location_call_wrapper(location_dict: typing.Dict, http_session: requests.Session) -> typing.List | None:
def _cptgpt_location_call_wrapper(location_dict: typing.Dict, http_session: requests.Session) -> typing.List or None:
endpoint = PRIMARY_GPT_ENDPOINT
# ToDo move messages to standalone config file
params = {
Expand Down Expand Up @@ -510,7 +510,7 @@ def _cptgpt_location_call_wrapper(location_dict: typing.Dict, http_session: requ


def _cptgpt_summarise_call_wrapper(message_dict: typing.Dict, http_session: requests.Session,
max_post_length: int) -> str | None:
max_post_length: int) -> str or None:
system_prompt = (
f'Please reason step by step to draft {max_post_length} or less character social media posts about potential '
'City of Cape Town service outage or update, using the details in provided JSON objects. '
Expand Down Expand Up @@ -775,7 +775,7 @@ def _generate_screenshot_of_area(area_gdf: geopandas.GeoDataFrame, area_filename
return minio_utils.file_to_minio(local_image_path, AREA_IMAGE_BUCKET)


def _generate_image_link(area_type: str, area: str, location: str | None, wkt_str: str) -> str:
def _generate_image_link(area_type: str, area: str, location: str or None, wkt_str: str) -> str:
template_params = dict(
salt_str=base64.b64encode(bytes(AREA_IMAGE_SALT, 'utf-8')).decode(),
area_type_str=base64.b64encode(bytes(area_type, 'utf-8')).decode(),
Expand Down Expand Up @@ -839,10 +839,10 @@ class ServiceAlertAugmenter(ServiceAlertBase.ServiceAlertsBase):
def __init__(self, minio_read_name=FIXED_SA_NAME, minio_write_name=AUGMENTED_SA_NAME):
super().__init__(None, None, minio_utils.DataClassification.LAKE,
minio_read_name=minio_read_name, minio_write_name=minio_write_name,
use_cached_values=False, stage_cache_salt=AUGMENTER_SALT, index_col=ID_COL)
use_cached_values=True, stage_cache_salt=AUGMENTER_SALT, index_col=ID_COL)

# all data is reverse sorted
self.data = self.get_data_from_minio().sort_values(by=['publish_date'], ascending=False).head(10)
self.data = self.get_data_from_minio().sort_values(by=['publish_date'], ascending=False)

# if there aren't new values, take some undrafted ones from the cache
less_than_limit = DRAFT_LIMIT - self.data.shape[0]
Expand Down Expand Up @@ -947,7 +947,7 @@ def lookup_geospatial_image_link(self):
if not image_filename_lookup.empty:
self.data[IMAGE_COL] = image_filename_lookup

def infer_area(self, layer_name: str, layer_col: str, data_col_name: str, layer_query: str | None = None):
def infer_area(self, layer_name: str, layer_col: str, data_col_name: str, layer_query: str or None = None):
if self.data.empty:
logging.warning("No data, so skipping...")
return
Expand Down
4 changes: 2 additions & 2 deletions src/cct_connector/ServiceAlertEmailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@

@dataclasses.dataclass
class ServiceAlertEmailConfig(ServiceAlertOutputFileConfig):
receivers: typing.Tuple[typing.Tuple[str | None, str], ...]
receivers: typing.Tuple[typing.Tuple[str or None, str], ...]
email_focus: str
additional_filter: str | typing.Callable | None
additional_filter: str or typing.Callable or None

def apply_additional_filter(self, data_df: pandas.DataFrame) -> pandas.DataFrame:
logging.debug(f"( pre-filter) {data_df.shape=}")
Expand Down
23 changes: 13 additions & 10 deletions src/cct_connector/ServiceAlertFixer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from db_utils import minio_utils
import pandas as pd
import pyarrow.dataset as ds
import pytz

from cct_connector import ServiceAlertBase
from cct_connector import (
Expand All @@ -16,6 +17,7 @@
SN_REGEX_PATTERN = r"^\d{10}$"
SN_RE = re.compile(SN_REGEX_PATTERN)
HM_RE = re.compile(r'^\d{2}:\d{2}$')
SAST_TZ = pytz.timezone('Africa/Johannesburg')


def _clean_sa_df(data_df: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -43,22 +45,24 @@ def _clean_sa_df(data_df: pd.DataFrame) -> pd.DataFrame:
).dt.time,
"forecast_end_time": lambda df: df["Forecast_x0020_End_x0020_Time"].apply(
lambda val: (
datetime.strptime(val.replace("60", "59") + "+02:00", "%H:%M%z") if HM_RE.match(val) else None
datetime.strptime(
val.replace("60", "59") + "+02:00", "%H:%M%z"
) if (pd.notna(val) and HM_RE.match(val)) else None
)
).dt.time,
),
# Creating timestamps
"start_timestamp": lambda df: df.apply(
lambda row: datetime.combine(row["effective_date"], row["start_time"]),
axis=1
).dt.tz_localize("+02:00"),
"forecast_end_timestamp": lambda df: df.apply(
lambda row: datetime.combine(
lambda row: SAST_TZ.localize(datetime.combine(
# Assuming that it ends on the day of expiry
(row["expiry_date"] - pd.Timedelta(days=1)).date(),
row["forecast_end_time"]
) if pd.notna(row['forecast_end_time']) else None,
row["forecast_end_time"].time()
)) if pd.notna(row['forecast_end_time']) else None,
axis=1
).dt.tz_localize("+02:00"),
),
"location": lambda df: df.apply(
lambda row: (
# dropping location entry if it overlaps with the description entry
Expand All @@ -68,15 +72,14 @@ def _clean_sa_df(data_df: pd.DataFrame) -> pd.DataFrame:
row["Address_x0020_Location_x0020_2"][:len(row["Description12"])] !=
row["Description12"][:len(row["Address_x0020_Location_x0020_2"])]
) else None
),
axis=1
), axis=1
)
}).assign(**{
# fixing cases where the start and end timestamps roll over the day
"forecast_end_timestamp": lambda df: df.apply(
lambda row: row["forecast_end_timestamp"] + pd.Timedelta(days=(
lambda row: (row["forecast_end_timestamp"] + pd.Timedelta(days=(
1 if row["forecast_end_timestamp"] <= row["start_timestamp"] else 0
)),
))) if pd.notna(row["forecast_end_timestamp"]) else None,
axis=1
),
}).rename(columns={
Expand Down

0 comments on commit c187fb3

Please sign in to comment.