From 5142b1cebd7a827915a020b18c901629d309bef7 Mon Sep 17 00:00:00 2001 From: enrico facca Date: Tue, 30 Jul 2024 14:12:51 +0200 Subject: [PATCH] both dmk and pdhg works but only on squared grids and there is probably some eroor in pdhg with to scaling --- src/darsia/measure/wasserstein.py | 607 ++++++++++++++++++++++++++---- 1 file changed, 542 insertions(+), 65 deletions(-) diff --git a/src/darsia/measure/wasserstein.py b/src/darsia/measure/wasserstein.py index 12c3bd43..b37b9b26 100644 --- a/src/darsia/measure/wasserstein.py +++ b/src/darsia/measure/wasserstein.py @@ -1734,8 +1734,332 @@ def __call__(self, img_1: darsia.Image, img_2: darsia.Image) -> float: return distance, info +class WassersteinDistanceDMK(darsia.EMD): + """ + This contains the implementation of the GproxPDHG algorithm + described in "SOLVING LARGE-SCALE OPTIMIZATION PROBLEMS WITH + A CONVERGENCE RATE INDEPENDENT OF GRID SIZE" + """ + + def __init__(self, grid, options) -> None: + # Cache geometrical infos + self.grid = grid + """darsia.Grid: grid""" + + self.voxel_size = grid.voxel_size + """np.ndarray: voxel size""" + + # Cache solver options + self.options = options + """dict: options for the solver""" + + self.verbose = self.options.get("verbose", False) + """bool: verbosity""" + + # Allocate space for main and auxiliary variables + self.transport_density = np.zeros(self.grid.num_faces, dtype=float) + """np.ndarray: flux""" + + self.flux = np.zeros(self.grid.num_faces, dtype=float) + """np.ndarray: flux""" + + self.pressure = np.zeros(self.grid.num_cells, dtype=float) + """ np.ndarray: pressure of the poisson problem -div(p) = f = img1- img2""" + + self.gradient_poisson = np.zeros(self.grid.num_faces, dtype=float) + """ varaible storing tprint(self.div.shape)he gradient of the poisson problem""" + + self._setup_discretization() + + def _setup_discretization(self) -> None: + """Setup of fixed discretization operators. + + Add linear contribution of the optimality conditions of the Newton linearization. + + """ + + # ! ---- Discretization operators ---- + + if self.grid.voxel_size[0] != self.grid.voxel_size[1]: + raise ValueError("The grid must be isotropic") + self.h = self.grid.voxel_size[0] + + + self.div = darsia.FVDivergence(self.grid).mat / self.h + """sps.csc_matrix: divergence operator: flat fluxes -> flat pressures""" + + + self.mass_matrix_cells = darsia.FVMass(self.grid).mat + """sps.csc_matrix: mass matrix on cells: flat pressures -> flat pressures""" + + + + self.grad = self.div.T / self.h + + def setup_elliptic_solver(self, transport_density, rtol=1e-6): + """ + + """ + self.weighted_Laplacian_matrix = self.div * sps.diags(transport_density) / self.h * self.grad + + # Define CG solver + kernel = np.ones(self.grid.num_cells, dtype=float) / np.sqrt(self.grid.num_cells) + weighted_Poisson_solver = darsia.linalg.KSP(self.weighted_Laplacian_matrix, + nullspace=[kernel], + appctx={}) + + weighted_Poisson_ksp_ctrl = { + "ksp_type": "cg", + "ksp_rtol": rtol, + "ksp_maxit": 100, + "pc_type": "hypre", + "ksp_monitor": None, + } + weighted_Poisson_solver.setup(weighted_Poisson_ksp_ctrl) + + return weighted_Poisson_solver + + def _solve(self, flat_mass_diff: np.ndarray) -> tuple[float, np.ndarray, dict]: + """Solve the Beckman problem using Newton's method. + + Args: + flat_mass_diff (np.ndarray): difference of mass distributions + + Returns: + tuple: distance, solution, info + + """ + # Setup time and memory profiling + tic = time.time() + tracemalloc.start() + + # Solver parameters. By default tolerances for increment and distance are + # set, such that they do not affect the convergence. + num_iter = self.options.get("num_iter", 100) + tol_residual = self.options.get("tol_residual", np.finfo(float).max) + tol_increment = self.options.get("tol_increment", np.finfo(float).max) + tol_distance = self.options.get("tol_distance", np.finfo(float).max) + + + # Initialize Newton iteration with Darcy solution for unitary mobility + self.transport_density[:] = 1.0 + Poisson_solver = self.setup_elliptic_solver(self.transport_density) + pressure = Poisson_solver.solve(flat_mass_diff) + gradient_pressure = self.grad.dot(pressure) + + # Initialize distance in case below iteration fails + new_distance = 0 + + # Initialize container for storing the convergence history + convergence_history = { + "distance": [], + "residual": [], + "flux_increment": [], + "distance_increment": [], + "timing": [], + "run_time": [], + } + + # Print header for later printing performance to screen + # - distance + # - distance increment + # - flux increment + # - residual + if self.verbose: + print( + "DMK iter. \t| W^1 \t\t| Δ W^1 \t| Δ flux \t| residual", + "\n", + """---------------|---------------|---------------|---------------|""" + """---------------""", + ) + + + self.iter = 0 + # PDHG iterations + while self.iter <= num_iter: + # + start = time.time() + + # udpate transport density + update = self.transport_density * ( gradient_pressure ** 2 - 1.0) + deltat = 0.25 + self.transport_density += deltat * update + + # udpate potential + Poisson_solver = self.setup_elliptic_solver(self.transport_density) + pressure = Poisson_solver.solve(flat_mass_diff) + gradient_pressure = self.grad.dot(pressure) + flux = self.transport_density * gradient_pressure + + + # int_ pot f = int_ pot div poisson_pressure = int_ grad pot \cdot \nabla poisson_pressure + self.dual_value = self.compute_dual(pressure, flat_mass_diff) + self.primal_value = self.compute_primal(flux) + self.duality_gap = abs(self.dual_value - self.primal_value) + distance = self.primal_value + + + self.iter += 1 + + self.iter_cpu = time.time() - start + #if self.verbose : + print( + f"it: {self.iter:03d}" + + f" gap={self.duality_gap:.1e}" + + f" dual={self.dual_value:.3e} primal={self.primal_value:.3e}" + + f" cpu={self.iter_cpu:.3f}"+ + f" max grad={np.max(gradient_pressure):.2e}" + f" {np.min(self.transport_density):.2e}<=TDENS<={np.max(self.transport_density):.2e}" + ) + + #convergence_history["distance"].append(new_distance) + #convergence_history["residual"].append(np.linalg.norm(residual_i, 2)) + + # Compute the error and store as part of the convergence history: + # 0 - full residual (Newton interpretation) + # 1 - flux increment (fixed-point interpretation) + # 2 - distance increment (Minimization interpretation) + + # Update convergence history + #convergence_history["distance"].append(new_distance) + #convergence_history["residual"].append(np.linalg.norm(residual_i, 2)) + #convergence_history["flux_increment"].append( + # np.linalg.norm(increment[self.flux_slice], 2) + #) + #convergence_history["distance_increment"].append( + # abs(new_distance - old_distance) + #) + #convergence_history["timing"].append(stats_i) + + # Extract current total run time + #current_run_time = self._analyze_timings(convergence_history["timing"])[ + # "total" + #] + #convergence_history["run_time"].append(current_run_time) + + # Print performance to screen + # - distance + # - distance increment + # - flux increment + # - residual + if False:#self.verbose: + distance_increment = convergence_history["distance_increment"][-1] + flux_increment = ( + convergence_history["flux_increment"][-1] + / convergence_history["flux_increment"][0] + ) + residual = ( + convergence_history["residual"][-1] + / convergence_history["residual"][0] + ) + print( + f"""Iter. {iter} \t| {new_distance:.6e} \t| """ + f"""{distance_increment:.6e} \t| {flux_increment:.6e} \t| """ + f"""{residual:.6e}""" + ) + + # Summarize profiling (time in seconds, memory in GB) + #total_timings = self._analyze_timings(convergence_history["timing"]) + #peak_memory_consumption = tracemalloc.get_traced_memory()[1] / 10**9 + + + + # Define performance metric + info = { + "converged": self.iter < num_iter, + "number_iterations": self.iter, + "convergence_history": convergence_history, + #"timings": total_timings, + #"peak_memory_consumption": peak_memory_consumption, + } + + solution = np.concatenate([pressure, self.transport_density]) + + return distance, solution, info + + def compute_dual(self, pressure, forcing): + """ + Compute the value of the dual functional + $\int_{\Domain} pot (f^+ - f^-)$ + """ + return np.dot(pressure, forcing) * np.prod(self.grid.voxel_size) + + def compute_primal(self,flux): + """ + Compute the value of the primal functional + $\int_{\Domain} | flux | $ + """ + return np.sum(np.abs(flux)) * np.prod(self.grid.voxel_size) + + def __call__( + self, + img_1: darsia.Image, + img_2: darsia.Image, + ) -> float: + """L1 Wasserstein distance for two images with same mass. + + NOTE: Images need to comply with the setup of the object. + + Args: + img_1 (darsia.Image): image 1, source distribution + img_2 (darsia.Image): image 2, destination distribution + + Returns: + float: distance between img_1 and img_2. + dict (optional): solution + dict (optional): info + + """ + + # Compatibilty check + assert img_1.scalar and img_2.scalar + self._compatibility_check(img_1, img_2) + + # Determine difference of distributions and define corresponding rhs + mass_diff = img_2.img - img_1.img + flat_mass_diff = np.ravel(mass_diff, "F") + + + # Main method + distance, solution, info = self._solve(flat_mass_diff) + + flat_pressure = solution[:self.grid.num_cells] + flat_gradient = self.div.T.dot(flat_pressure) + flat_tdens = solution[-self.grid.num_faces:] + flat_flux = flat_tdens * flat_gradient + + + flux = darsia.face_to_cell(self.grid, flat_flux) + temp = np.hstack([flat_tdens,flat_tdens]) + transport_density = darsia.face_to_cell(self.grid, temp)[:,:,0] + + + pressure = flat_pressure.reshape(self.grid.shape, order="F") + + + + + # Return solution + return_info = self.options.get("return_info", False) + if return_info: + info.update( + { + "grid": self.grid, + "mass_diff": mass_diff, + "flux": flux, + "pressure": pressure, + "transport_density": transport_density, + "src": img_1, + "dst": img_2, + } + ) + return distance, info + else: + return distance + + -class WassersteinDistanceGproxPGHD(VariationalWassersteinDistance): +class WassersteinDistanceGproxPGHD(darsia.EMD): """ This contains the implementation of the GproxPDHG algorithm described in "SOLVING LARGE-SCALE OPTIMIZATION PROBLEMS WITH @@ -1743,9 +2067,21 @@ class WassersteinDistanceGproxPGHD(VariationalWassersteinDistance): """ def __init__(self, grid, options) -> None: - super().__init__(grid, options) + # Cache geometrical infos + self.grid = grid + """darsia.Grid: grid""" - # allocate space for main and auxiliary variables + self.voxel_size = grid.voxel_size + """np.ndarray: voxel size""" + + # Cache solver options + self.options = options + """dict: options for the solver""" + + self.verbose = self.options.get("verbose", False) + """bool: verbosity""" + + # Allocate space for main and auxiliary variables self.flux = np.zeros(self.grid.num_faces, dtype=float) """np.ndarray: flux""" @@ -1755,9 +2091,11 @@ def __init__(self, grid, options) -> None: self.gradient_poisson = np.zeros(self.grid.num_faces, dtype=float) """ varaible storing the gradient of the poisson problem""" + self.rhs_forcing = np.zeros(self.grid.num_cells, dtype=float) + """ variable storing the right hand side of the poisson problem""" - self.p_bar = np.zeros(self.grid.num_faces, dtype=float) - """np.ndarray: $\bar{p}$""" + + self.div_free_flux = np.zeros(self.grid.num_faces, dtype=float) """ np.ndarray: divergence free flux""" @@ -1765,7 +2103,17 @@ def __init__(self, grid, options) -> None: self.u = np.zeros(self.grid.num_faces, dtype=float) """np.ndarray: u""" + self.p = np.zeros(self.grid.num_faces, dtype=float) + """np.ndarray: $p^{n}$""" + + + self.p_bar = np.zeros(self.grid.num_faces, dtype=float) + """np.ndarray: $\bar{p}$""" + self.new_p = np.zeros(self.grid.num_faces, dtype=float) + """np.ndarray: $p^{n+1}$""" + + self._setup_discretization() def _setup_discretization(self) -> None: """Setup of fixed discretization operators. @@ -1773,12 +2121,35 @@ def _setup_discretization(self) -> None: Add linear contribution of the optimality conditions of the Newton linearization. """ - super()._setup_discretization() - self.Laplacian = -self.div * self.mass_matrix_faces * self.div + # ! ---- Discretization operators ---- - self.linear_solve(self.Laplacian, rhs.copy(), self.pot, setup=True) + self.div = darsia.FVDivergence(self.grid).mat + """sps.csc_matrix: divergence operator: flat fluxes -> flat pressures""" + print(self.div.shape) + self.mass_matrix_cells = darsia.FVMass(self.grid).mat + """sps.csc_matrix: mass matrix on cells: flat pressures -> flat pressures""" + + if self.grid.voxel_size[0] != self.grid.voxel_size[1]: + raise ValueError("The grid must be isotropic") + self.h = self.grid.voxel_size[0] + self.Laplacian_matrix = self.div * self.div.T /self.h**2 + + # Define CG solver + kernel = np.ones(self.grid.num_cells, dtype=float) / np.sqrt(self.grid.num_cells) + self.Poisson_solver = darsia.linalg.KSP(self.Laplacian_matrix, + nullspace=[kernel], + appctx={}) + + self.Poisson_ksp_ctrl = { + "ksp_type": "cg", + "ksp_rtol": 1e-6, + "ksp_maxit": 100, + "pc_type": "hypre", + "ksp_monitor": None, + } + self.Poisson_solver.setup(self.Poisson_ksp_ctrl) def residual(self, rhs: np.ndarray, solution: np.ndarray) -> np.ndarray: @@ -1805,7 +2176,38 @@ def leray_projection(self, p: np.ndarray) -> np.ndarray: np.ndarray: divergence free flux """ - return p - self.div.T.dot(self.linear_solve(self.Laplacian, self.div.dot(p), setup=False)) + rhs = self.div.dot(p) + poisson_solution = self.Poisson_solver.solve(rhs) / self.h**2 + return p - self.div.T.dot(poisson_solution) + + def compute_pressure(self, flux: np.ndarray, forcing: np.array) -> np.ndarray: + """Compute the pressure from the flux. + + Args: + flux (np.ndarray): flux + + Returns: + np.ndarray: pressure + + """ + self.Laplacian_matrix = self.div * sps.diags(np.abs(flux), dtype=float)* self.div.T + + # Define CG solver + kernel = np.ones(self.grid.num_cells, dtype=float) / np.sqrt(self.grid.num_cells) + self.weighted_Poisson_solver = darsia.linalg.KSP(self.Laplacian_matrix, + nullspace=[kernel], + appctx={}) + + self.weighted_Poisson_ksp_ctrl = { + "ksp_type": "cg", + "ksp_rtol": 1e-6, + "ksp_maxit": 100, + "pc_type": "hypre", + "ksp_monitor": None, + } + self.weighted_Poisson_solver.setup(self.weighted_Poisson_ksp_ctrl) + pressure = self.weighted_Poisson_solver.solve(forcing) + return pressure def _solve(self, flat_mass_diff: np.ndarray) -> tuple[float, np.ndarray, dict]: """Solve the Beckman problem using Newton's method. @@ -1828,17 +2230,9 @@ def _solve(self, flat_mass_diff: np.ndarray) -> tuple[float, np.ndarray, dict]: tol_increment = self.options.get("tol_increment", np.finfo(float).max) tol_distance = self.options.get("tol_distance", np.finfo(float).max) - # Define right hand side - rhs = np.concatenate( - [ - np.zeros(self.grid.num_faces, dtype=float), - self.mass_matrix_cells.dot(flat_mass_diff), - ] - ) - self.regularized_flat_flux_norm = np.ones(self.grid.num_faces, dtype=float) - + # Initialize Newton iteration with Darcy solution for unitary mobility - poisson_pressure, _ = self.poisson_solver.solve(flat_mass_diff) + poisson_pressure = self.Poisson_solver.solve(flat_mass_diff) gradient_poisson_pressure = self.div.T.dot(poisson_pressure) # Initialize distance in case below iteration fails @@ -1868,47 +2262,63 @@ def _solve(self, flat_mass_diff: np.ndarray) -> tuple[float, np.ndarray, dict]: ) - - + p_bar = self.p_bar + u = self.u + p = self.p + new_p = self.new_p + self.iter = 0 # PDHG iterations - for iter in range(num_iter): + while self.iter <= num_iter: # + start = time.time() tau = self.options.get("tau", 1.0) sigma = self.options.get("sigma", 1.0) - if iter > 0: - p = new_p + if self.iter > 0: + p[:] = new_p[:] # eq 3.14 div_free = self.leray_projection(p_bar) + #res_div = self.div.dot(div_free) + #print(f"res_div: {np.linalg.norm(res_div)}") u -= tau * div_free # new flux - flux = u + gradient_poisson_pressure + self.flux = u + gradient_poisson_pressure + #res_div = self.div.dot(flux) + #print(f"res_div: {np.linalg.norm(res_div)}") # eq 3.15 - sigma_vel = p + sigma * (u + gradient_poisson_pressure) - new_p = sigma_vel - greater_than_1 = np.where(abs(sigma_vel) > 1) - new_p[greater_than_1] /= new_p[greater_than_1] # normalize too +1 or -1 + sigma_vel = p + sigma * self.flux + #print(sigma_vel) + new_p[:] = sigma_vel[:] + abs_sigma_vel = np.abs(sigma_vel) + greater_than_1 = np.where(abs_sigma_vel > 1) + new_p[greater_than_1] /= abs_sigma_vel[greater_than_1] # normalize too +1 or -1 + #print(new_p) + # eq 3.16 - p_bar = 2 * new_p - p + p_bar[:] = 2 * new_p[:] - p[:] # int_ pot f = int_ pot div poisson_pressure = int_ grad pot \cdot \nabla poisson_pressure - dual_value = self.compute_dual(p, gradient_poisson_pressure) - primal_value = self.compute_primal(flux) + self.dual_value = self.compute_dual(p, gradient_poisson_pressure) + self.primal_value = self.compute_primal(self.flux) + self.duality_gap = abs(self.dual_value - self.primal_value) + distance = self.primal_value - duality_gap = abs(self.dual - self.primal) - if self.verbose : - print( + self.iter += 1 + + self.iter_cpu = time.time() - start + #if self.verbose : + print( f"it: {self.iter:03d}" + f" gap={self.duality_gap:.1e}" + - f" dual={self.dual:.3e} primal={self.primal:.3e}" + + f" dual={self.dual_value:.3e} primal={self.primal_value:.3e}" + f" cpu={self.iter_cpu:.3f}") - convergence_history["distance"].append(new_distance) - convergence_history["residual"].append(np.linalg.norm(residual_i, 2)) + #convergence_history["distance"].append(new_distance) + #convergence_history["residual"].append(np.linalg.norm(residual_i, 2)) # Compute the error and store as part of the convergence history: # 0 - full residual (Newton interpretation) @@ -1916,28 +2326,28 @@ def _solve(self, flat_mass_diff: np.ndarray) -> tuple[float, np.ndarray, dict]: # 2 - distance increment (Minimization interpretation) # Update convergence history - convergence_history["distance"].append(new_distance) - convergence_history["residual"].append(np.linalg.norm(residual_i, 2)) - convergence_history["flux_increment"].append( - np.linalg.norm(increment[self.flux_slice], 2) - ) - convergence_history["distance_increment"].append( - abs(new_distance - old_distance) - ) - convergence_history["timing"].append(stats_i) + #convergence_history["distance"].append(new_distance) + #convergence_history["residual"].append(np.linalg.norm(residual_i, 2)) + #convergence_history["flux_increment"].append( + # np.linalg.norm(increment[self.flux_slice], 2) + #) + #convergence_history["distance_increment"].append( + # abs(new_distance - old_distance) + #) + #convergence_history["timing"].append(stats_i) # Extract current total run time - current_run_time = self._analyze_timings(convergence_history["timing"])[ - "total" - ] - convergence_history["run_time"].append(current_run_time) + #current_run_time = self._analyze_timings(convergence_history["timing"])[ + # "total" + #] + #convergence_history["run_time"].append(current_run_time) # Print performance to screen # - distance # - distance increment # - flux increment # - residual - if self.verbose: + if False:#self.verbose: distance_increment = convergence_history["distance_increment"][-1] flux_increment = ( convergence_history["flux_increment"][-1] @@ -1954,21 +2364,23 @@ def _solve(self, flat_mass_diff: np.ndarray) -> tuple[float, np.ndarray, dict]: ) # Summarize profiling (time in seconds, memory in GB) - total_timings = self._analyze_timings(convergence_history["timing"]) - peak_memory_consumption = tracemalloc.get_traced_memory()[1] / 10**9 + #total_timings = self._analyze_timings(convergence_history["timing"]) + #peak_memory_consumption = tracemalloc.get_traced_memory()[1] / 10**9 + + # Define performance metric info = { - "converged": iter < num_iter, - "number_iterations": iter, + "converged": self.iter < num_iter, + "number_iterations": self.iter, "convergence_history": convergence_history, - "timings": total_timings, - "peak_memory_consumption": peak_memory_consumption, + #"timings": total_timings, + #"peak_memory_consumption": peak_memory_consumption, } - return new_distance, solution, info + return distance, self.flux, info - def compute_dual(p,gradient_poisson): + def compute_dual(self,p,gradient_poisson): """ Compute the value of the dual functional $\int_{\Domain} pot (f^+ - f^-)$ @@ -1976,15 +2388,76 @@ def compute_dual(p,gradient_poisson): $\int_{\Domain} \nabla pot \cdot \nabla poisson$ $\int_{\Domain} p \cdot \nabla poisson$ """ - temp = self.mass_matrix_faces.dot(gradient_poisson) - return np.dot(p, temp) + return np.dot(p, gradient_poisson) * np.prod(self.grid.voxel_size) - def compute_primal(flux): + def compute_primal(self,flux): """ Compute the value of the primal functional $\int_{\Domain} | flux | $ """ - return np.sum(self.mass_matrix_faces.dot(flux)) + return np.sum(np.abs(flux)) * np.prod(self.grid.voxel_size) + + def __call__( + self, + img_1: darsia.Image, + img_2: darsia.Image, + ) -> float: + """L1 Wasserstein distance for two images with same mass. + + NOTE: Images need to comply with the setup of the object. + + Args: + img_1 (darsia.Image): image 1, source distribution + img_2 (darsia.Image): image 2, destination distribution + + Returns: + float: distance between img_1 and img_2. + dict (optional): solution + dict (optional): info + + """ + + # Compatibilty check + assert img_1.scalar and img_2.scalar + self._compatibility_check(img_1, img_2) + + # Determine difference of distributions and define corresponding rhs + mass_diff = img_2.img - img_1.img + flat_mass_diff = np.ravel(mass_diff, "F") + + + # Main method + distance, solution, info = self._solve(flat_mass_diff) + + print(solution.shape) + + flux = darsia.face_to_cell(self.grid, solution) + print(flux.shape) + flat_pressure = self.compute_pressure(solution, flat_mass_diff) + pressure = flat_pressure.reshape(self.grid.shape, order="F") + transport_density = np.abs(flux) + + + + # Return solution + return_info = self.options.get("return_info", False) + if return_info: + info.update( + { + "grid": self.grid, + "mass_diff": mass_diff, + "flux": flux, + "pressure": pressure, + "transport_density": transport_density, + "src": img_1, + "dst": img_2, + } + ) + return distance, info + else: + return distance + + @@ -2006,7 +2479,7 @@ def wasserstein_distance( """ # Define method for computing 1-Wasserstein distance - if method.lower() in ["newton", "bregman", "sinkhorn"]: + if method.lower() in ["newton", "bregman", "sinkhorn","pdhg","dmk"]: # Use Finite Volume Iterative Method (Newton or Bregman) # Extract grid - implicitly assume mass_2 to generate same grid @@ -2022,6 +2495,10 @@ def wasserstein_distance( w1 = WassersteinDistanceBregman(grid, options) elif method.lower() == "sinkhorn": w1 = WassersteinDistanceSinkhorn(grid, options) + elif method.lower() == "pdhg": + w1 = WassersteinDistanceGproxPGHD(grid, options) + elif method.lower() == "dmk": + w1 = WassersteinDistanceDMK(grid, options) elif method.lower() == "cv2.emd": # Use Earth Mover's Distance from CV2