diff --git a/src/cct_connector/ServiceAlertAugmenter.py b/src/cct_connector/ServiceAlertAugmenter.py index 2b9178c..10af2f7 100644 --- a/src/cct_connector/ServiceAlertAugmenter.py +++ b/src/cct_connector/ServiceAlertAugmenter.py @@ -81,7 +81,7 @@ @functools.lru_cache -def _load_gis_layer(area_type: str, layer_query: str | None = None): +def _load_gis_layer(area_type: str, layer_query: str or None = None): if area_type in AREA_LOOKUP: layer_name, _ = AREA_LOOKUP[area_type] else: @@ -117,7 +117,7 @@ def _get_overture_street_data() -> geopandas.GeoDataFrame: def _geocode_location(address: str, - bounding_polygon: shapely.geometry.base) -> shapely.geometry.base | None: + bounding_polygon: shapely.geometry.base) -> shapely.geometry.base or None: output_shape = None logging.debug(f"Attempting to geocode '{address=}'") @@ -198,7 +198,7 @@ def _geocode_location(address: str, return output_shape -def _cptgpt_location_call_wrapper(location_dict: typing.Dict, http_session: requests.Session) -> typing.List | None: +def _cptgpt_location_call_wrapper(location_dict: typing.Dict, http_session: requests.Session) -> typing.List or None: endpoint = PRIMARY_GPT_ENDPOINT # ToDo move messages to standalone config file params = { @@ -510,7 +510,7 @@ def _cptgpt_location_call_wrapper(location_dict: typing.Dict, http_session: requ def _cptgpt_summarise_call_wrapper(message_dict: typing.Dict, http_session: requests.Session, - max_post_length: int) -> str | None: + max_post_length: int) -> str or None: system_prompt = ( f'Please reason step by step to draft {max_post_length} or less character social media posts about potential ' 'City of Cape Town service outage or update, using the details in provided JSON objects. ' @@ -775,7 +775,7 @@ def _generate_screenshot_of_area(area_gdf: geopandas.GeoDataFrame, area_filename return minio_utils.file_to_minio(local_image_path, AREA_IMAGE_BUCKET) -def _generate_image_link(area_type: str, area: str, location: str | None, wkt_str: str) -> str: +def _generate_image_link(area_type: str, area: str, location: str or None, wkt_str: str) -> str: template_params = dict( salt_str=base64.b64encode(bytes(AREA_IMAGE_SALT, 'utf-8')).decode(), area_type_str=base64.b64encode(bytes(area_type, 'utf-8')).decode(), @@ -839,10 +839,10 @@ class ServiceAlertAugmenter(ServiceAlertBase.ServiceAlertsBase): def __init__(self, minio_read_name=FIXED_SA_NAME, minio_write_name=AUGMENTED_SA_NAME): super().__init__(None, None, minio_utils.DataClassification.LAKE, minio_read_name=minio_read_name, minio_write_name=minio_write_name, - use_cached_values=False, stage_cache_salt=AUGMENTER_SALT, index_col=ID_COL) + use_cached_values=True, stage_cache_salt=AUGMENTER_SALT, index_col=ID_COL) # all data is reverse sorted - self.data = self.get_data_from_minio().sort_values(by=['publish_date'], ascending=False).head(10) + self.data = self.get_data_from_minio().sort_values(by=['publish_date'], ascending=False) # if there aren't new values, take some undrafted ones from the cache less_than_limit = DRAFT_LIMIT - self.data.shape[0] @@ -947,7 +947,7 @@ def lookup_geospatial_image_link(self): if not image_filename_lookup.empty: self.data[IMAGE_COL] = image_filename_lookup - def infer_area(self, layer_name: str, layer_col: str, data_col_name: str, layer_query: str | None = None): + def infer_area(self, layer_name: str, layer_col: str, data_col_name: str, layer_query: str or None = None): if self.data.empty: logging.warning("No data, so skipping...") return diff --git a/src/cct_connector/ServiceAlertEmailer.py b/src/cct_connector/ServiceAlertEmailer.py index 9ebd873..bc04053 100644 --- a/src/cct_connector/ServiceAlertEmailer.py +++ b/src/cct_connector/ServiceAlertEmailer.py @@ -38,9 +38,9 @@ @dataclasses.dataclass class ServiceAlertEmailConfig(ServiceAlertOutputFileConfig): - receivers: typing.Tuple[typing.Tuple[str | None, str], ...] + receivers: typing.Tuple[typing.Tuple[str or None, str], ...] email_focus: str - additional_filter: str | typing.Callable | None + additional_filter: str or typing.Callable or None def apply_additional_filter(self, data_df: pandas.DataFrame) -> pandas.DataFrame: logging.debug(f"( pre-filter) {data_df.shape=}") diff --git a/src/cct_connector/ServiceAlertFixer.py b/src/cct_connector/ServiceAlertFixer.py index 7570dbf..0077c73 100644 --- a/src/cct_connector/ServiceAlertFixer.py +++ b/src/cct_connector/ServiceAlertFixer.py @@ -5,6 +5,7 @@ from db_utils import minio_utils import pandas as pd import pyarrow.dataset as ds +import pytz from cct_connector import ServiceAlertBase from cct_connector import ( @@ -16,6 +17,7 @@ SN_REGEX_PATTERN = r"^\d{10}$" SN_RE = re.compile(SN_REGEX_PATTERN) HM_RE = re.compile(r'^\d{2}:\d{2}$') +SAST_TZ = pytz.timezone('Africa/Johannesburg') def _clean_sa_df(data_df: pd.DataFrame) -> pd.DataFrame: @@ -43,22 +45,24 @@ def _clean_sa_df(data_df: pd.DataFrame) -> pd.DataFrame: ).dt.time, "forecast_end_time": lambda df: df["Forecast_x0020_End_x0020_Time"].apply( lambda val: ( - datetime.strptime(val.replace("60", "59") + "+02:00", "%H:%M%z") if HM_RE.match(val) else None + datetime.strptime( + val.replace("60", "59") + "+02:00", "%H:%M%z" + ) if (pd.notna(val) and HM_RE.match(val)) else None ) - ).dt.time, + ), # Creating timestamps "start_timestamp": lambda df: df.apply( lambda row: datetime.combine(row["effective_date"], row["start_time"]), axis=1 ).dt.tz_localize("+02:00"), "forecast_end_timestamp": lambda df: df.apply( - lambda row: datetime.combine( + lambda row: SAST_TZ.localize(datetime.combine( # Assuming that it ends on the day of expiry (row["expiry_date"] - pd.Timedelta(days=1)).date(), - row["forecast_end_time"] - ) if pd.notna(row['forecast_end_time']) else None, + row["forecast_end_time"].time() + )) if pd.notna(row['forecast_end_time']) else None, axis=1 - ).dt.tz_localize("+02:00"), + ), "location": lambda df: df.apply( lambda row: ( # dropping location entry if it overlaps with the description entry @@ -68,15 +72,14 @@ def _clean_sa_df(data_df: pd.DataFrame) -> pd.DataFrame: row["Address_x0020_Location_x0020_2"][:len(row["Description12"])] != row["Description12"][:len(row["Address_x0020_Location_x0020_2"])] ) else None - ), - axis=1 + ), axis=1 ) }).assign(**{ # fixing cases where the start and end timestamps roll over the day "forecast_end_timestamp": lambda df: df.apply( - lambda row: row["forecast_end_timestamp"] + pd.Timedelta(days=( + lambda row: (row["forecast_end_timestamp"] + pd.Timedelta(days=( 1 if row["forecast_end_timestamp"] <= row["start_timestamp"] else 0 - )), + ))) if pd.notna(row["forecast_end_timestamp"]) else None, axis=1 ), }).rename(columns={