Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atlite ESGF interface for downloading and preparing CMIP6 data #180

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d672694
add initial CMIP interface
Ovewh Jun 22, 2021
12912ca
working cmip interface with atlite
Ovewh Jun 23, 2021
fa614d4
Merge branch 'PyPSA:master' into cmip
Ovewh Jun 23, 2021
ca50bd2
update dependencies
Ovewh Jun 23, 2021
04840f4
add esgf-pyclient
Ovewh Jun 24, 2021
d23f051
change orography to geopotential
Ovewh Jun 24, 2021
fe43d73
Merge branch 'master' of https://github.com/PyPSA/atlite into PyPSA-m…
Ovewh Jun 28, 2021
22d7f98
add sanitize_runoff cmip
Ovewh Jun 28, 2021
70c3b3d
add cmip example
Ovewh Jun 29, 2021
a0ff51f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 29, 2021
11f7fb8
Merge branch 'PyPSA:master' into cmip
Ovewh Jun 29, 2021
ec92ef4
add sanitize functionallity of CMIP data
Jun 30, 2021
5a8307f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 30, 2021
3f01d28
cmip data available for the future
Ovewh Jul 5, 2021
c69293e
update example to use future data
Ovewh Jul 5, 2021
058bb6b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 5, 2021
d539fb8
fix file selection and time index
Ovewh Jul 7, 2021
ba27a32
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 7, 2021
a3be2f9
drop conflicting variable from dataset
Ovewh Jul 7, 2021
2359d2c
Merge branch 'cmip' of https://github.com/Ovewh/atlite into cmip
Ovewh Jul 7, 2021
a1c613d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 7, 2021
cfe6b9e
Merge branch 'PyPSA:master' into cmip
Ovewh Sep 2, 2021
6eac85b
Fix issue with 360_day calendar
Ovewh Sep 3, 2021
279592c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 3, 2021
db0a86a
Merge branch 'PyPSA:master' into cmip
Ovewh Dec 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions atlite/cutout.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, path, **cutoutparams):
NetCDF from which to load or where to store the cutout.
module : str or list
The dataset(s) which works as a basis for the cutout. Available
modules are "era5", "sarah" and "gebco".
modules are "era5", "sarah", "cmip" and "gebco".
This is necessary when building a new cutout.
If more than one module is given, their order determines how atlite
fills up missing features when preparing the cutout with
Expand Down Expand Up @@ -132,6 +132,14 @@ def __init__(self, path, **cutoutparams):
gebco_path: str
Path to find the gebco netcdf file. Only necessary when including
the gebco module.
esgf_params: dict
Parameters to be used in search on the ESGF database.
model: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep that we should rename this keyword argument into something more specific, like esgf_model

The ESGF search parameters can also be specified in the cmip.yml file,
then model correspond to the name of the model specifed in the cmip.yml file.
roughness_path: str
Path to external roughness dataset, required for converting CMIP
winds.
parallel : bool, default False
Whether to open dataset in parallel mode. Take effect for all
xr.open_mfdataset usages.
Expand All @@ -151,6 +159,7 @@ def __init__(self, path, **cutoutparams):
chunks = cutoutparams.pop("chunks", {"time": 100})
storable_chunks = {f"chunksize_{k}": v for k, v in (chunks or {}).items()}

self.esgf_params = cutoutparams.pop("esgf_params", None)
# Backward compatibility for xs, ys, months and years
if {"xs", "ys"}.intersection(cutoutparams):
warn(
Expand Down Expand Up @@ -208,7 +217,7 @@ def __init__(self, path, **cutoutparams):
) from exc

# TODO: check for dx, dy, x, y fine with module requirements
coords = get_coords(x, y, time, **cutoutparams)
coords = get_coords(x, y, time, module=module, **cutoutparams)

attrs = {
"module": module,
Expand Down
9 changes: 5 additions & 4 deletions atlite/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dask.utils import SerializableLock
from dask.diagnostics import ProgressBar
import logging
from itertools import chain

logger = logging.getLogger(__name__)

Expand All @@ -42,14 +43,14 @@ def get_features(cutout, module, features, tmpdir=None):
cutout, feature, tmpdir=tmpdir, lock=lock, **parameters
)
datasets.append(feature_data)

datasets = compute(*datasets)

ds = xr.merge(datasets, compat="equals")
fd = datamodules[module].features.items()
datavars = list(chain(*[l for k, l in fd]))
for v in ds:
ds[v].attrs["module"] = module
fd = datamodules[module].features.items()
ds[v].attrs["feature"] = [k for k, l in fd if v in l].pop()
if v in datavars:
ds[v].attrs["feature"] = [k for k, l in fd if v in l].pop()
Comment on lines +52 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give me a hint why this is necessary? It is hard to comprehend without the context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CMIP data have longitude bins, latitude bins and time bins included as variables and these aren't defined as variables in the features dict. If I don't include if statement it will try to pop an empty list. Suggestion on a better way to do this?

Data variables:
    time_bnds    (time, bnds) datetime64[ns] 2021-01-01 ... 2021-02-01
    lat_bnds     (y, bnds) float64 31.65 32.35 32.65 33.35 ... 82.35 82.65 83.35
    lon_bnds     (x, bnds) float64 346.6 347.4 347.6 348.4 ... 44.35 44.65 45.35
    wnd10m       (time, y, x) float32 dask.array<chunksize=(100, 52, 59), meta=np.ndarray>
    influx       (time, y, x) float32 dask.array<chunksize=(100, 52, 59), meta=np.ndarray>
    outflux      (time, y, x) float32 dask.array<chunksize=(100, 52, 59), meta=np.ndarray>
    temperature  (time, y, x) float32 dask.array<chunksize=(100, 52, 59), meta=np.ndarray>
    runoff       (time, y, x) float32 dask.array<chunksize=(100, 52, 59), meta=np.ndarray>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather question whether we want to carry these variables along? If not needed, I'd argue to delete them before (in get_data) since they not requested from the feature list anyway

return ds


Expand Down
4 changes: 2 additions & 2 deletions atlite/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later

from . import era5, sarah, gebco
from . import era5, sarah, gebco, cmip

modules = {"era5": era5, "sarah": sarah, "gebco": gebco}
modules = {"era5": era5, "sarah": sarah, "gebco": gebco, "cmip": cmip}
292 changes: 292 additions & 0 deletions atlite/datasets/cmip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2020-2021 The Atlite Authors
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Module for downloading and preparing data from the ESGF servers
to be used in atlite.
"""

import xarray as xr
from pyesgf.search import SearchConnection

import logging
import dask
from ..gis import maybe_swap_spatial_dims
import numpy as np
import pandas as pd

# Null context for running a with statements without any context
try:
from contextlib import nullcontext
except ImportError:
# for Python verions < 3.7:
import contextlib

@contextlib.contextmanager
def nullcontext():
yield


logger = logging.getLogger(__name__)

features = {
"wind": ["wnd10m"],
"influx": ["influx", "outflux"],
"temperature": ["temperature"],
"runoff": ["runoff"],
}
Comment on lines +33 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these features are all the features provided by cmip? Just wondering because they look very similar to era5 (Idea is that a feature is a umbrella term for the xarray variable which will be retrieved)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's all the features. Though I'm not sure I get your question, isn't the point that the features should have the same name across the different datasets?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, totally correct. I just wanted to be sure this isn't a copy/paste artefact


crs = 4326

dask.config.set({"array.slicing.split_large_chunks": True})


def search_ESGF(esgf_params, url="https://esgf-data.dkrz.de/esg-search"):
conn = SearchConnection(url, distrib=True)
ctx = conn.new_context(latest=True, **esgf_params)
if ctx.hit_count == 0:
ctx = ctx.constrain(frequency=esgf_params["frequency"] + "Pt")
if ctx.hit_count == 0:
raise (ValueError("No results found in the ESGF_database"))
latest = ctx.search()[0]
return latest.file_context().search()


def get_data_runoff(esgf_params, cutout, **retrieval_params):
"""
Get runoff data for given retrieval parameters
(the run off retrival have not be tested extensively)
"""
coords = cutout.coords
ds = retrieve_data(
esgf_params,
coords,
variables=["mrro"],
**retrieval_params,
)
ds = _rename_and_fix_coords(cutout, ds)
ds = ds.rename({"mrro": "runoff"})
return ds


def sanitize_runoff(ds):
"""Sanitize retrieved runoff data."""
ds["runoff"] = ds["runoff"].clip(min=0.0)
return ds


def get_data_influx(esgf_params, cutout, **retrieval_params):
"""Get influx data for given retrieval parameters."""
coords = cutout.coords
ds = retrieve_data(
esgf_params,
coords,
variables=["rsds", "rsus"],
**retrieval_params,
)

ds = _rename_and_fix_coords(cutout, ds)

ds = ds.rename({"rsds": "influx", "rsus": "outflux"})

return ds


def sanitize_inflow(ds):
"""Sanitize retrieved inflow data."""
ds["influx"] = ds["influx"].clip(min=0.0)
return ds


def get_data_temperature(esgf_params, cutout, **retrieval_params):
"""Get temperature for given retrieval parameters."""
coords = cutout.coords
ds = retrieve_data(esgf_params, coords, variables=["tas"], **retrieval_params)

ds = _rename_and_fix_coords(cutout, ds)
ds = ds.rename({"tas": "temperature"})
ds = ds.drop_vars("height")

return ds


def get_data_wind(esgf_params, cutout, **retrieval_params):
"""Get wind for given retrieval parameters"""

coords = cutout.coords
ds = retrieve_data(esgf_params, coords, ["sfcWind"], **retrieval_params)
ds = _rename_and_fix_coords(cutout, ds)
ds = ds.rename({"sfcWind": "wnd{:0d}m".format(int(ds.sfcWind.height.values))})
ds = ds.drop_vars("height")
return ds


def _year_in_file(time_range, years):
"""
Find which file contains the requested years

Parameters:
time_range: str
fmt YYYYMMDD-YYYYMMDD
years: list
"""

time_range = time_range.split(".")[0]
s_year = int(time_range.split("-")[0][:4])
e_year = int(time_range.split("-")[1][:4])
date_range = pd.date_range(str(s_year), str(e_year), freq="AS")
if s_year == e_year and e_year in years:
return True
elif date_range.year.isin(years).any() == True:
return True
else:
return False


def retrieve_data(esgf_params, coords, variables, chunks=None, tmpdir=None, lock=None):
"""
Download data from egsf database

"""
time = coords["time"].to_index()
years = time.year.unique()
dsets = []
if lock is None:
lock = nullcontext()
with lock:
for variable in variables:
esgf_params["variable"] = variable
search_results = search_ESGF(esgf_params)
files = [
f.opendap_url
for f in search_results
if _year_in_file(f.opendap_url.split("_")[-1], years)
]
dsets.append(xr.open_mfdataset(files, chunks=chunks, concat_dim=["time"]))
ds = xr.merge(dsets)

ds.attrs = {**ds.attrs, **esgf_params}

return ds


def _rename_and_fix_coords(cutout, ds, add_lon_lat=True, add_ctime=False):
"""Rename 'longitude' and 'latitude' columns to 'x' and 'y' and fix roundings.

Optionally (add_lon_lat, default:True) preserves latitude and longitude
columns as 'lat' and 'lon'.

CMIP specifics; shift the longitude from 0..360 to -180..180. In addition
CMIP sometimes specify the time in the center of the output intervall this shifted to the beginning.
"""
ds = ds.assign_coords(lon=(((ds.lon + 180) % 360) - 180))
ds.lon.attrs["valid_max"] = 180
ds.lon.attrs["valid_min"] = -180
ds = ds.sortby("lon")

ds = ds.rename({"lon": "x", "lat": "y"})
dt = cutout.dt
ds = maybe_swap_spatial_dims(ds)
if add_lon_lat:
ds = ds.assign_coords(lon=ds.coords["x"], lat=ds.coords["y"])
if add_ctime:
ds = ds.assign_coords(ctime=ds.coords["time"])

# shift averaged data to beginning of bin

if "time_bnds" in ds.data_vars:
ds = ds.drop_vars("time_bnds")
if "time_bounds" in ds.data_vars:
ds = ds.drop_vars("time_bounds")

if "lat_bnds" in ds.data_vars:
ds = ds.drop_vars("lat_bnds")
if "lon_bnds" in ds.data_vars:
ds = ds.drop_vars("lon_bnds")

ds = ds.assign_coords(time=ds.coords["time"].dt.floor(dt))

if isinstance(ds.time[0].values, np.datetime64) == False:
if xr.CFTimeIndex(ds.time.values).calendar == "360_day":
from xclim.core.calendar import convert_calendar

ds = convert_calendar(ds, cutout.data.time, align_on="year")
else:
ds = ds.assign_coords(
time=xr.CFTimeIndex(ds.time.values).to_datetimeindex(unsafe=True)
)

return ds


def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters):
"""
Retrieve data from the ESGF CMIP database.

This front-end function downloads data for a specific feature and formats
it to match the given Cutout.

Parameters
----------
cutout : atlite.Cutout
feature : str
Name of the feature data to retrieve. Must be in
`atlite.datasets.cmip.features`
tmpdir : str/Path
Directory where the temporary netcdf files are stored.
**creation_parameters :
Additional keyword arguments. The only effective argument is 'sanitize'
(default True) which sets sanitization of the data on or off.

Returns
-------
xarray.Dataset
Dataset of dask arrays of the retrieved variables.

"""
coords = cutout.coords

sanitize = creation_parameters.get("sanitize", True)

if cutout.esgf_params == None:
raise (ValueError("ESGF search parameters not provided"))
else:
esgf_params = cutout.esgf_params
if esgf_params.get("frequency") == None:
if cutout.dt == "H":
freq = "h"
elif cutout.dt == "3H":
freq = "3hr"
elif cutout.dt == "6H":
freq = "6hr"
elif cutout.dt == "D":
freq = "day"
elif cutout.dt == "M":
freq = "mon"
elif cutout.dt == "Y":
freq = "year"
else:
raise (ValueError(f"{cutout.dt} not valid time frequency in CMIP"))
else:
freq = esgf_params.get("frequency")

esgf_params["frequency"] = freq

retrieval_params = {"chunks": cutout.chunks, "tmpdir": tmpdir, "lock": lock}

func = globals().get(f"get_data_{feature}")
FabianHofmann marked this conversation as resolved.
Show resolved Hide resolved

logger.info(f"Requesting data for feature {feature}...")

ds = func(esgf_params, cutout, **retrieval_params)
ds = ds.sel(time=coords["time"])
bounds = cutout.bounds
ds = ds.sel(x=slice(bounds[0], bounds[2]), y=slice(bounds[1], bounds[3]))
ds = ds.interp({"x": cutout.data.x, "y": cutout.data.y})

if globals().get(f"sanitize_{feature}") != None and sanitize:
sanitize_func = globals().get(f"sanitize_{feature}")
ds = sanitize_func(ds)

return ds
9 changes: 7 additions & 2 deletions atlite/gis.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ def get_coords(x, y, time, dx=0.25, dy=0.25, dt="h", **kwargs):
"""
x = slice(*sorted([x.start, x.stop]))
y = slice(*sorted([y.start, y.stop]))

if "cmip" in kwargs.get("module"):
start = "2015"
end = "2100"
else:
start = "1979"
end = "now"
ds = xr.Dataset(
{
"x": np.round(np.arange(-180, 180, dx), 9),
"y": np.round(np.arange(-90, 90, dy), 9),
"time": pd.date_range(start="1979", end="now", freq=dt),
"time": pd.date_range(start=start, end=end, freq=dt),
}
)
ds = ds.assign_coords(lon=ds.coords["x"], lat=ds.coords["y"])
Expand Down
1 change: 1 addition & 0 deletions environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies:
- shapely
- progressbar2
- tqdm
- esgf-pyclient

# dev tools
- black
Expand Down
Loading