Skip to content

Commit

Permalink
MAINT: Move code for al mobility modes into face weight computations.
Browse files Browse the repository at this point in the history
  • Loading branch information
jwboth committed Aug 21, 2024
1 parent 285eb53 commit c54b0d3
Showing 1 changed file with 117 additions and 104 deletions.
221 changes: 117 additions & 104 deletions src/darsia/measure/wasserstein.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,16 @@ def _setup_face_weights(self) -> None:
"""np.ndarray: isotropic face weights"""
else:
self.cell_weights = self.weight.img
self.face_weights = 1.0 / darsia.cell_to_face_average(
self.grid, 1.0 / self.cell_weights, mode="harmonic"
)
self.face_weights = self._harmonic_average(self.cell_weights)
if len(self.weight.num_voxels) == self.grid.dim:
self.isotropic_cell_weights = self.weight.img
self.isotropic_face_weights = 1.0 / darsia.cell_to_face_average(
self.grid, 1.0 / self.isotropic_cell_weights, mode="harmonic"
self.isotropic_face_weights = self._harmonic_average(
self.isotropic_cell_weights
)
else:
self.isotropic_cell_weights = np.min(self.weight.img, axis=-1)
self.isotropic_face_weights = 1.0 / darsia.cell_to_face_average(
self.grid, 1.0 / self.isotropic_cell_weights, mode="harmonic"
self.isotropic_face_weights = self._harmonic_average(
self.isotropic_cell_weights
)

def _setup_discretization(self) -> None:
Expand Down Expand Up @@ -715,64 +713,115 @@ def l1_dissipation(self, flat_flux: np.ndarray) -> float:

# ! ---- Lumping of effective mobility

def weighted_vector_face_flux_norm(
self,
flat_flux: np.ndarray,
mode: Literal["cell_based", "subcell_based", "face_based"],
) -> np.ndarray:
"""Compute the norm of the cell-weighted vector-valued fluxes on the faces.
def _product(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Compute the product of two arrays.
Args:
flat_flux (np.ndarray): flat fluxes (normal fluxes on the faces)
In the cell-based modes, the fluxes are projected to the cells and the norm across
faces is computed via averaging. Similar for subcell_based definition. In the
face-based mode, the norm is computed directly on the faces; for this fluxes are
constructed with knowledge of RT0 basis functions.
a (np.ndarray): array a
b (np.ndarray): array b
Returns:
np.ndarray: flat norm of the vector-valued fluxes on the faces
np.ndarray: product
"""
if len(a.shape) == len(b.shape) + 1 and a.shape[:-1] == b.shape:
return a * b[..., np.newaxis]
elif len(a.shape) == len(b.shape) - 1 and a.shape == b.shape[:-1]:
return a[..., np.newaxis] * b
elif len(a.shape) == len(b.shape) and a.shape == b.shape:
return a * b
else:
raise ValueError("Shapes not compatible.")

# NOTE: Method currently not in use anymore. Instead self.compute_face_weight is used.
# Keep for potential future use.

# Determine the norm of the fluxes on the faces
if mode in [
"cell_based",
"cell_based_arithmetic",
"cell_based_harmonic",
]:
# Cell-based mode determines the norm of the fluxes on the faces via
# averaging of neighboring cells.

# Extract average mode from mode
if mode == "cell_based":
average_mode = "harmonic"
else:
average_mode = mode.split("_")[2]
def _harmonic_average(self, cell_values: np.ndarray) -> np.ndarray:
"""Compute the harmonic average of cell values on faces.
# The flux norm is identical to the transport density without weights
cell_flux_norm = self.transport_density(
NOTE: Averaging strategies originate from the FV literature and are adapted
to the current setting. Usually, the effective mobility is computed cell-wise
and then averaged to faces. When translated to flux computations, the effective
mobility is inverted again, correlating to the face weights. Therefore,
inversions are required.
"""
return 1.0 / darsia.cell_to_face_average(
self.grid, 1.0 / cell_values, mode="harmonic"
)

def _compute_face_weight(self, flat_flux: np.ndarray) -> np.ndarray:
"""FV-style face weighting, using harmonic averaging.
The goal is to compute a face weight resembling the effective mobility
which is essentially cell_weight ** 2 / |weight * flux|. Different choices
how to compute the effective mobility are possible, combining different
averaging strategies.
NOTE: Averaging strategies originate from the FV literature and are adapted
to the current setting. Usually, the effective mobility is computed cell-wise
and then averaged to faces. When translated to flux computations, the effective
mobility is inverted again, correlating to the face weights. Therefore,
inversions are required.
"""
if self.mobility_mode in ["cell_based", "cell_based_harmonic"]:
# Idea: Compute first weight ** 2 / |weight * flux| on cells
# and reduce the values to faces via harmonic averaging.
# |weight * flux| on cells
weighted_cell_flux_norm = self.transport_density(
flat_flux, weighted=True, flatten=False
)

# Map to faces via averaging of neighboring cells
flat_flux_norm = darsia.cell_to_face_average(
self.grid, cell_flux_norm, mode=average_mode
regularized_weighted_cell_flux_norm = np.maximum(
weighted_cell_flux_norm,
self.regularization,
)
# weight * (weight / |weight * flux|): cell -> face via harmonic averaging
# TODO use _harmonic_average
cell_weights_inv = self._product(
1.0 / self.cell_weights**2, regularized_weighted_cell_flux_norm
)
face_weights_inv = darsia.cell_to_face_average(
self.grid, cell_weights_inv, mode="harmonic"
)
face_weights = 1.0 / face_weights_inv
elif self.mobility_mode == "cell_based_arithmetic":
# Idea: Combine two factors: Reduce weight to faces via
# harmonic averaging and compute weight / |weight * flux| on faces via
# arithmetic averaging.
# cell_weight: cell -> face via harmonic averaging
harm_avg_face_weights = self._harmonic_average(self.cell_weights)
# |weight * flux|
weighted_cell_flux_norm = self.transport_density(
flat_flux, weighted=True, flatten=False
)
regularized_weighted_cell_flux_norm = np.maximum(
weighted_cell_flux_norm,
self.regularization,
)
# |weight| / |weight * flux|: cell -> face via arithmetic averaging
# of the inverse
weight_ratio_inv = self._product(
1.0 / self.cell_weights, regularized_weighted_cell_flux_norm
)
arithm_avg_weight_ratio_inv = darsia.cell_to_face_average(
self.grid, weight_ratio_inv, mode="arithmetic"
)
face_weights = harm_avg_face_weights / arithm_avg_weight_ratio_inv
face_weights_inv = 1.0 / face_weights
elif self.mobility_mode == "subcell_based":
# Idea: Aply harmonic averaging of cell weights and weighted sub cell
# reconstruction to compute |weight * flux| on faces.

# cell_weight: cell -> face via harmonic averaging
harm_avg_face_weights = self._harmonic_average(self.cell_weights)

elif mode == "subcell_based":
# Subcell-based mode determines the norm of the fluxes on the faces via
# averaging of neighboring subcells.

# Initialize the flux norm
# Initialize the flux norm |weight * flux| on the faces
num_subcells = 2**self.grid.dim
subcell_flux_norm = np.zeros(
(self.grid.num_faces, num_subcells), dtype=float
)
flat_flux_norm = np.zeros(self.grid.num_faces, dtype=float)
flat_weighted_flux_norm = np.zeros(self.grid.num_faces, dtype=float)

# Fetch cell corners
cell_corners = self.grid.cell_corners
Expand Down Expand Up @@ -810,69 +859,34 @@ def weighted_vector_face_flux_norm(
).ravel("F")[cells]

# Average over the subcells using harmonic averaging
flat_flux_norm = hmean(subcell_flux_norm, axis=1)
flat_weighted_flux_norm = hmean(subcell_flux_norm, axis=1)

elif mode == "face_based":
if not hasattr(self, "face_reconstruction"):
self._setup_face_reconstruction()
# Combine weights**2 / |weight * flux| on faces
face_weights = harm_avg_face_weights**2 / flat_weighted_flux_norm
face_weights_inv = 1.0 / face_weights

if self.weight is not None:
raise NotImplementedError(
"Face-based mode not implemented for weights."
)
elif self.mobility_mode == "face_based":
# Idea: Use harmonic averaging of cell weights and face reconstruction
# to compute |weight * flux| on faces.

# weight: cell -> face
harm_avg_face_weights = self._harmonic_average(self.cell_weights)

# Define natural vector valued flux on faces (taking arithmetic averages
# of continuous fluxes over cells evaluated at faces)
if not hasattr(self, "face_reconstruction"):
self._setup_face_reconstruction()
full_face_flux = self.face_reconstruction(flat_flux)
# Determine the l2 norm of the fluxes on the faces
flat_flux_norm = np.linalg.norm(full_face_flux, 2, axis=1)

else:
raise ValueError(f"Mode {mode} not supported.")

return flat_flux_norm

def _product(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Compute the product of two arrays.
Args:
a (np.ndarray): array a
b (np.ndarray): array b
Returns:
np.ndarray: product
# Determine the l2 norm of the fluxes on the faces
weighted_face_flux = self._product(harm_avg_face_weights, full_face_flux)
norm_weighted_face_flux = np.linalg.norm(weighted_face_flux, 2, axis=1)

"""
if len(a.shape) == len(b.shape) + 1 and a.shape[:-1] == b.shape:
return a * b[..., np.newaxis]
elif len(a.shape) == len(b.shape) - 1 and a.shape == b.shape[:-1]:
return a[..., np.newaxis] * b
elif len(a.shape) == len(b.shape) and a.shape == b.shape:
return a * b
# Combine weights**2 / |weight * flux| on faces
face_weights = harm_avg_face_weights**2 / norm_weighted_face_flux
face_weights_inv = 1.0 / face_weights
else:
raise ValueError("Shapes not compatible.")

def _compute_face_weight(self, flat_flux: np.ndarray) -> np.ndarray:
"""FV-style face weighting, using harmonic averaging."""
assert self.mobility_mode in [
"cell_based",
"cell_based_harmonic",
], f"Only cell-based mobility supported - not {self.mobility_mode}."
cell_weight = self.cell_weights
weighted_cell_flux_norm = self.transport_density(
flat_flux, weighted=True, flatten=False
)
regularized_weighted_cell_flux_norm = np.maximum(
weighted_cell_flux_norm,
self.regularization,
)
cell_weights_inv = self._product(
1.0 / cell_weight**2, regularized_weighted_cell_flux_norm
)
face_weights_inv = darsia.cell_to_face_average(
self.grid, cell_weights_inv, mode="harmonic"
)
face_weights = 1.0 / face_weights_inv
raise ValueError(f"Mobility mode {self.mobility_mode} not supported.")

return face_weights, face_weights_inv

Expand Down Expand Up @@ -1671,14 +1685,13 @@ def _solve(self, flat_mass_diff: np.ndarray) -> tuple[float, np.ndarray, dict]:
)

# Relaxation parameter entering Bregman regularization
self.L = self.options.get("L", 1.0)
weight = 1.0 / self.L
shrink_factor = self.L
weight = 1.0 / self.L * sps.diags(self.face_weights, format="csc")
shrink_factor = self.L / self.face_weights

# Initialize linear problem corresponding to Bregman regularization
l_scheme_mixed_darcy = sps.bmat(
[
[weight * self.mass_matrix_faces, -self.div.T, None],
[weight @ self.mass_matrix_faces, -self.div.T, None],
[self.div, None, -self.pressure_constraint.T],
[None, self.pressure_constraint, None],
],
Expand Down

0 comments on commit c54b0d3

Please sign in to comment.