Skip to content

Commit

Permalink
Use transactional write for to_netcdf
Browse files Browse the repository at this point in the history
  • Loading branch information
eivindjahren committed Sep 26, 2024
1 parent 146ac6c commit a6921a5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
12 changes: 7 additions & 5 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,8 @@ def load_cross_correlations(self) -> xr.Dataset:

@require_write
def save_observation_scaling_factors(self, dataset: xr.Dataset) -> None:
dataset.to_netcdf(
self.mount_point / "observation_scaling_factors.nc", engine="scipy"
self._storage._to_netcdf_transaction(
self.mount_point / "observation_scaling_factors.nc", dataset
)

def load_observation_scaling_factors(
Expand Down Expand Up @@ -622,7 +622,7 @@ def save_cross_correlations(
}
dataset = xr.Dataset(data_vars)
file_path = os.path.join(self.mount_point, "corr_XY.nc")
dataset.to_netcdf(path=file_path, engine="scipy")
self._storage._to_netcdf_transaction(file_path, dataset)

@lru_cache # noqa: B019
def load_responses(self, key: str, realizations: Tuple[int]) -> xr.Dataset:
Expand Down Expand Up @@ -822,7 +822,9 @@ def save_parameters(
path = self._realization_dir(realization) / f"{_escape_filename(group)}.nc"
path.parent.mkdir(exist_ok=True)

dataset.expand_dims(realizations=[realization]).to_netcdf(path, engine="scipy")
self._storage._to_netcdf_transaction(
path, dataset.expand_dims(realizations=[realization])
)

@require_write
def save_response(
Expand Down Expand Up @@ -857,7 +859,7 @@ def save_response(
output_path = self._realization_dir(realization)
Path.mkdir(output_path, parents=True, exist_ok=True)

data.to_netcdf(output_path / f"{response_type}.nc", engine="scipy")
self._storage._to_netcdf_transaction(output_path / f"{response_type}.nc", data)

def calculate_std_dev_for_parameter(self, parameter_group: str) -> xr.Dataset:
if parameter_group not in self.experiment.parameter_configuration:
Expand Down
2 changes: 1 addition & 1 deletion src/ert/storage/local_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def create(
output_path = path / "observations"
output_path.mkdir()
for obs_name, dataset in observations.items():
dataset.to_netcdf(output_path / f"{obs_name}", engine="scipy")
storage._to_netcdf_transaction(output_path / f"{obs_name}", dataset)

simulation_data = simulation_arguments if simulation_arguments else {}
storage._write_transaction(
Expand Down
16 changes: 15 additions & 1 deletion src/ert/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ def get_unique_experiment_name(self, experiment_name: str) -> str:
else:
return experiment_name + "_0"

def _write_transaction(self, filename: str | os.PathLike, data: bytes) -> None:
def _write_transaction(self, filename: str | os.PathLike[str], data: bytes) -> None:
"""
Writes the data to the filename as a transaction.
Expand All @@ -567,6 +567,20 @@ def _write_transaction(self, filename: str | os.PathLike, data: bytes) -> None:
f.write(data)
os.rename(f.name, filename)

def _to_netcdf_transaction(
self, filename: str | os.PathLike[str], dataset: xr.Dataset
) -> None:
"""
Writes the dataset to the filename as a transaction.
Guarantees to not leave half-written or empty files on disk if the write
fails or the process is killed.
"""
self._swap_path.mkdir(parents=True, exist_ok=True)
with NamedTemporaryFile(dir=self._swap_path, delete=False) as f:
dataset.to_netcdf(f, engine="scipy") # type: ignore

Check failure on line 581 in src/ert/storage/local_storage.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Unused "type: ignore" comment
os.rename(f.name, filename)


def _storage_version(path: Path) -> int:
if not path.exists():
Expand Down

0 comments on commit a6921a5

Please sign in to comment.