Skip to content

Commit

Permalink
Merge pull request #197 from openclimatefix/pvnet
Browse files Browse the repository at this point in the history
Pvnet
  • Loading branch information
dfulu authored May 12, 2023
2 parents f14b7cb + 3780405 commit 0fdc96b
Show file tree
Hide file tree
Showing 208 changed files with 791 additions and 74 deletions.
8 changes: 4 additions & 4 deletions ocf_datapipes/config/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,9 @@ def model_validation(cls, v):
class Satellite(DataSourceMixin, TimeResolutionMixin):
"""Satellite configuration model"""

satellite_zarr_path: str = Field(
satellite_zarr_path: Union[str, tuple[str], list[str]] = Field(
"gs://solar-pv-nowcasting-data/satellite/EUMETSAT/SEVIRI_RSS/OSGB36/all_zarr_int16_single_timestep.zarr", # noqa: E501
description="The path which holds the satellite zarr.",
description="The path or list of paths which hold the satellite zarr.",
)
satellite_channels: tuple = Field(
SAT_VARIABLE_NAMES[1:], description="the satellite channels that are used"
Expand Down Expand Up @@ -354,9 +354,9 @@ class Satellite(DataSourceMixin, TimeResolutionMixin):
class HRVSatellite(DataSourceMixin, TimeResolutionMixin):
"""Satellite configuration model for HRV data"""

hrvsatellite_zarr_path: str = Field(
hrvsatellite_zarr_path: Union[str, tuple[str], list[str]] = Field(
"gs://solar-pv-nowcasting-data/satellite/EUMETSAT/SEVIRI_RSS/OSGB36/all_zarr_int16_single_timestep.zarr", # noqa: E501
description="The path which holds the satellite zarr.",
description="The path or list of paths which hold the satellite zarr.",
)

hrvsatellite_channels: tuple = Field(
Expand Down
90 changes: 68 additions & 22 deletions ocf_datapipes/load/satellite.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Satellite loader"""
import logging
import subprocess
from pathlib import Path
from typing import Union

Expand All @@ -13,40 +14,83 @@
_log = logging.getLogger(__name__)


def open_sat_data(zarr_path: Union[Path, str, list[Union[str, Path]]]) -> xr.DataArray:
def _get_single_sat_data(zarr_path: Union[Path, str]) -> xr.DataArray:
"""Helper function to open a zarr from either local or GCP path.
The local or GCP path may contain wildcard matching (*)
Args:
zarr_path: Path to zarr file
"""

# These kwargs are used if zarr path contains "*"
openmf_kwargs = dict(
engine="zarr",
concat_dim="time",
combine="nested",
chunks="auto",
join="override",
)

# Need to generate list of files if using GCP bucket storage
if "gs://" in str(zarr_path) and "*" in str(zarr_path):
result_string = subprocess.run(
f"gsutil ls -d {zarr_path}".split(" "), stdout=subprocess.PIPE
).stdout.decode("utf-8")
files = result_string.splitlines()

dataset = xr.open_mfdataset(files, **openmf_kwargs)

elif "*" in str(zarr_path): # Multi-file dataset
dataset = xr.open_mfdataset(zarr_path, **openmf_kwargs)
else:
dataset = xr.open_dataset(zarr_path, engine="zarr", chunks="auto")
dataset = dataset.drop_duplicates("time").sortby("time")

return dataset


def open_sat_data(zarr_path: Union[Path, str, list[Path], list[str]]) -> xr.DataArray:
"""Lazily opens the Zarr store.
Args:
zarr_path: Cloud URL or local path pattern. If GCP URL, must start with 'gs://'
zarr_path: Cloud URL or local path pattern, or list of these. If GCS URL, it must start with
'gs://'.
Example:
With wild cards and GCS path:
```
zarr_paths = [
"gs://bucket/2020_nonhrv_split_*.zarr",
"gs://bucket/2019_nonhrv_split_*.zarr",
]
ds = open_sat_data(zarr_paths)
```
Without wild cards and with local path:
```
zarr_paths = [
"/data/2020_nonhrv.zarr",
"/data/2019_nonhrv.zarr",
]
ds = open_sat_data(zarr_paths)
```
"""
_log.debug("Opening satellite data: %s", zarr_path)
_log.info("Opening satellite data: %s", zarr_path)

# Silence the warning about large chunks.
# Alternatively, we could set this to True, but that slows down loading a Satellite batch
# from 8 seconds to 50 seconds!
dask.config.set({"array.slicing.split_large_chunks": False})

# Open the data
if type(zarr_path) in [list, tuple] or "*" in str(zarr_path): # Multi-file dataset
dataset = (
xr.open_mfdataset(
zarr_path,
engine="zarr",
concat_dim="time",
combine="nested",
chunks={},
join="override",
)
.drop_duplicates("time")
.sortby("time")
if isinstance(zarr_path, (list, tuple)):
dataset = xr.combine_nested(
[_get_single_sat_data(path) for path in zarr_path],
concat_dim="time",
combine_attrs="override",
join="override",
)
else:
dataset = (
xr.open_dataset(zarr_path, engine="zarr", chunks={})
.drop_duplicates("time")
.sortby("time")
)

dataset = _get_single_sat_data(zarr_path)
# TODO add 15 mins data satellite option

# Remove data coordinate dimensions if they exist
Expand Down Expand Up @@ -115,6 +159,8 @@ def open_sat_data(zarr_path: Union[Path, str, list[Union[str, Path]]]) -> xr.Dat
# exactly 5 minutes past the hour.
assert (datetime_index == datetime_index.round("5T")).all()

_log.info("Opened satellite data")

return data_array


Expand Down
10 changes: 10 additions & 0 deletions ocf_datapipes/training/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ def create_t0_and_loc_datapipes(
configuration: Configuration,
key_for_t0: str = "gsp",
shuffle: bool = True,
nwp_max_t0_offset: timedelta = timedelta(minutes=0),
):
"""
Takes source datapipes and returns datapipes of appropriate sample pairs of locations and times.
Expand All @@ -330,6 +331,9 @@ def create_t0_and_loc_datapipes(
key_for_t0: Key to use for the t0 datapipe. Must be "gsp" or "pv".
shuffle: Whether to use the internal shuffle function when yielding location times. Else
location times will be heavily ordered.
nwp_max_t0_offset: If using dropout on NWP, sometimes we have to go back to previous NWP
init time. In order to accomodat for this possibility in selecting times, set
`nwp_max_t0_offset` as the max NWP dropout delay you plan to use.
Returns:
location datapipe, t0 datapipe
Expand All @@ -351,30 +355,35 @@ def create_t0_and_loc_datapipes(
history_duration = configuration.input_data.nwp.history_minutes
forecast_duration = configuration.input_data.nwp.forecast_minutes
time_dim = "init_time_utc"
max_t0_offset = nwp_max_t0_offset

elif key == "sat":
sample_frequency = 5
history_duration = configuration.input_data.satellite.history_minutes
forecast_duration = 0
time_dim = "time_utc"
max_t0_offset = timedelta(minutes=0)

elif key == "hrv":
sample_frequency = 5
history_duration = configuration.input_data.hrvsatellite.history_minutes
forecast_duration = 0
time_dim = "time_utc"
max_t0_offset = timedelta(minutes=0)

elif key == "pv":
sample_frequency = 5
history_duration = configuration.input_data.pv.history_minutes
forecast_duration = configuration.input_data.pv.forecast_minutes
time_dim = "time_utc"
max_t0_offset = timedelta(minutes=0)

elif key == "gsp":
sample_frequency = 30
history_duration = configuration.input_data.gsp.history_minutes
forecast_duration = configuration.input_data.gsp.forecast_minutes
time_dim = "time_utc"
max_t0_offset = timedelta(minutes=0)

else:
raise ValueError(f"Unexpected key: {key}")
Expand All @@ -386,6 +395,7 @@ def create_t0_and_loc_datapipes(
history_duration=timedelta(minutes=history_duration),
forecast_duration=timedelta(minutes=forecast_duration),
time_dim=time_dim,
max_t0_offset=max_t0_offset,
)

contiguous_time_datapipes.append(time_periods)
Expand Down
Loading

0 comments on commit 0fdc96b

Please sign in to comment.