diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index 0fef72bf880..7d354fdd82b 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -591,8 +591,8 @@ def load_cross_correlations(self) -> xr.Dataset: @require_write def save_observation_scaling_factors(self, dataset: xr.Dataset) -> None: - dataset.to_netcdf( - self.mount_point / "observation_scaling_factors.nc", engine="scipy" + self._storage._to_netcdf_transaction( + self.mount_point / "observation_scaling_factors.nc", dataset ) def load_observation_scaling_factors( @@ -622,7 +622,7 @@ def save_cross_correlations( } dataset = xr.Dataset(data_vars) file_path = os.path.join(self.mount_point, "corr_XY.nc") - dataset.to_netcdf(path=file_path, engine="scipy") + self._storage._to_netcdf_transaction(file_path, dataset) @lru_cache # noqa: B019 def load_responses(self, key: str, realizations: Tuple[int]) -> xr.Dataset: @@ -822,7 +822,9 @@ def save_parameters( path = self._realization_dir(realization) / f"{_escape_filename(group)}.nc" path.parent.mkdir(exist_ok=True) - dataset.expand_dims(realizations=[realization]).to_netcdf(path, engine="scipy") + self._storage._to_netcdf_transaction( + path, dataset.expand_dims(realizations=[realization]) + ) @require_write def save_response( @@ -857,7 +859,7 @@ def save_response( output_path = self._realization_dir(realization) Path.mkdir(output_path, parents=True, exist_ok=True) - data.to_netcdf(output_path / f"{response_type}.nc", engine="scipy") + self._storage._to_netcdf_transaction(output_path / f"{response_type}.nc", data) def calculate_std_dev_for_parameter(self, parameter_group: str) -> xr.Dataset: if parameter_group not in self.experiment.parameter_configuration: diff --git a/src/ert/storage/local_experiment.py b/src/ert/storage/local_experiment.py index 0b291018c8d..238aeeefbee 100644 --- a/src/ert/storage/local_experiment.py +++ b/src/ert/storage/local_experiment.py @@ -146,7 +146,7 @@ def create( output_path = path / "observations" output_path.mkdir() for obs_name, dataset in observations.items(): - dataset.to_netcdf(output_path / f"{obs_name}", engine="scipy") + storage._to_netcdf_transaction(output_path / f"{obs_name}", dataset) simulation_data = simulation_arguments if simulation_arguments else {} storage._write_transaction( diff --git a/src/ert/storage/local_storage.py b/src/ert/storage/local_storage.py index bd04854ccb7..3615f12f66a 100644 --- a/src/ert/storage/local_storage.py +++ b/src/ert/storage/local_storage.py @@ -555,7 +555,7 @@ def get_unique_experiment_name(self, experiment_name: str) -> str: else: return experiment_name + "_0" - def _write_transaction(self, filename: str | os.PathLike, data: bytes) -> None: + def _write_transaction(self, filename: str | os.PathLike[str], data: bytes) -> None: """ Writes the data to the filename as a transaction. @@ -567,6 +567,20 @@ def _write_transaction(self, filename: str | os.PathLike, data: bytes) -> None: f.write(data) os.rename(f.name, filename) + def _to_netcdf_transaction( + self, filename: str | os.PathLike[str], dataset: xr.Dataset + ) -> None: + """ + Writes the dataset to the filename as a transaction. + + Guarantees to not leave half-written or empty files on disk if the write + fails or the process is killed. + """ + self._swap_path.mkdir(parents=True, exist_ok=True) + with NamedTemporaryFile(dir=self._swap_path, delete=False) as f: + dataset.to_netcdf(f, engine="scipy") # type: ignore + os.rename(f.name, filename) + def _storage_version(path: Path) -> int: if not path.exists():