Skip to content

Commit

Permalink
add multie proessing to collecting nwp data
Browse files Browse the repository at this point in the history
  • Loading branch information
peterdudfield committed Dec 14, 2023
1 parent 33b9641 commit 77b2921
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 20 deletions.
4 changes: 2 additions & 2 deletions quartz_solar_forecast/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def get_nwp(site: PVSite, ts: datetime, nwp_source: str = "icon") -> xr.Dataset:
return data_xr


def format_nwp_data(df, nwp_source, site):
def format_nwp_data(df: pd.DataFrame, nwp_source:str, site: PVSite):
data_xr = xr.DataArray(
data=df.values,
dims=["step", "variable"],
Expand All @@ -112,7 +112,7 @@ def format_nwp_data(df, nwp_source, site):
return data_xr


def make_pv_data(site: PVSite, ts) -> xr.Dataset:
def make_pv_data(site: PVSite, ts: pd.Timestamp) -> xr.Dataset:
"""
Make fake PV data for the site
Expand Down
61 changes: 43 additions & 18 deletions quartz_solar_forecast/eval/nwp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import xarray as xr
from huggingface_hub import HfFileSystem

import multiprocessing


def get_nwp(time_locations: pd.DataFrame):
"""
Expand All @@ -19,16 +21,31 @@ def get_nwp(time_locations: pd.DataFrame):
"""

all_nwp_dfs = []
for i, row in time_locations.iterrows():
print(f"{i} of {len(time_locations)}")
one_nwp_df = get_nwp_for_one_timestamp_one_location(
row["timestamp"], row["latitude"], row["longitude"]
)

one_nwp_df["timestamp"] = row["timestamp"]
one_nwp_df["pv_id"] = row["pv_id"]
one_nwp_df["latitude"] = row["latitude"]
one_nwp_df["longitude"] = row["longitude"]
tasks_args = []
with multiprocessing.Pool(processes=3) as pool:
for i, row in time_locations.iterrows():
print(f"Making task {i} of {len(time_locations)}")

kwargs = {
"timestamp": row["timestamp"],
"latitude": row["latitude"],
"longitude": row["longitude"],
"pv_id": row["pv_id"],
}

# collect together args for pool.starmap
task_arg = list(kwargs.values())
tasks_args.append(task_arg)

print("Made all tasks")
print(tasks_args)
results = pool.starmap(get_nwp_for_one_timestamp_one_location, tasks_args)

print("Gathered all tasks")

for result in results:
one_nwp_df = result

all_nwp_dfs.append(one_nwp_df)

Expand All @@ -37,7 +54,9 @@ def get_nwp(time_locations: pd.DataFrame):
return all_nwp_df


def get_nwp_for_one_timestamp_one_location(timestamp: pd.Timestamp, latitude, longitude):
def get_nwp_for_one_timestamp_one_location(
timestamp: pd.Timestamp, latitude, longitude, pv_id: None
):
"""
Get NWP data from Hugging Face for one timestamp and one location
Expand All @@ -48,16 +67,17 @@ def get_nwp_for_one_timestamp_one_location(timestamp: pd.Timestamp, latitude, lo
:return: nwp forecast in xarray
"""

# round timestamp to 6 hours floor
fs = HfFileSystem()
# List which files are available. Not all dates, and model run times are available
# print(fs.ls("datasets/openclimatefix/dwd-icon-eu/data/2022/4/11/", detail=False))

# round timestamp to 6 hours floor
timestamp = timestamp.floor("6H")
year = timestamp.year
month = timestamp.month
day = timestamp.day
date_and_hour = timestamp.strftime("%Y%m%d_%H")
timestamp_floor = timestamp.floor("6H")
year = timestamp_floor.year
month = timestamp_floor.month
day = timestamp_floor.day
date_and_hour = timestamp_floor.strftime("%Y%m%d_%H")

date = f"{year}/{month}/{day}"
file_location = f"{date}/{date_and_hour}"
Expand Down Expand Up @@ -136,8 +156,13 @@ def get_nwp_for_one_timestamp_one_location(timestamp: pd.Timestamp, latitude, lo
# rename id to pv_id
df = df.rename(columns={"id": "pv_id"})

return df
# add columns for timestamp, latitude and longitude
df["timestamp"] = timestamp
df["latitude"] = latitude
df["longitude"] = longitude

# add pv_id columns if it is given
if pv_id is not None:
df["pv_id"] = pv_id

{'t_isnan', 'prate_isnan', 'lcc', 'h_mean_nan', 't', 'poa_global_now_is_zero', 'dswrf_isnan', 'h_median_nan', 'dlwrf', 'recent_power', 'capacity', 'h_median', 'dlwrf_isnan', 'vis_isnan', 'hcc_isnan', 'mcc', 'h_max_nan', 'vis', 'dswrf', 'recent_power_nan', 'h_mean', 'hcc', 'poa_global', 'h_max', 'mcc_isnan', 'si10', 'si10_isnan', 'lcc_isnan', 'prate'} != \
{'shortwave_radiation', 'h_mean_nan', 'poa_global_now_is_zero', 'windspeed_10m_isnan', 'longitude', 'h_median_nan', 'windspeed_10m', 'recent_power', 'capacity', 'h_median', 'precipitation_isnan', 'direct_radiation', 'cloudcover_mid', 'temperature_2m_isnan', 'cloudcover_low', 'temperature_2m', 'visibility', 'h_max_nan', 'visibility_isnan', 'cloudcover_high_isnan', 'shortwave_radiation_isnan', 'longitude_isnan', 'h_mean', 'cloudcover_low_isnan', 'recent_power_nan', 'cloudcover_high', 'pv_id_isnan', 'poa_global', 'h_max', 'latitude_isnan', 'latitude', 'cloudcover_mid_isnan', 'pv_id', 'precipitation', 'direct_radiation_isnan'}
return df

0 comments on commit 77b2921

Please sign in to comment.