Skip to content

Commit

Permalink
Merge pull request #12622 from KratosMultiphysics/optapp/fix_resp_eval
Browse files Browse the repository at this point in the history
[OptApp] Fix response function evaluation
  • Loading branch information
sunethwarna authored Aug 15, 2024
2 parents 23dd596 + 9c9ee0c commit 21df993
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from KratosMultiphysics.OptimizationApplication.execution_policies.execution_policy import ExecutionPolicy
from KratosMultiphysics.OptimizationApplication.utilities.component_data_view import ComponentDataView
from KratosMultiphysics.OptimizationApplication.utilities.optimization_problem import OptimizationProblem
from KratosMultiphysics.OptimizationApplication.utilities.buffered_dict import BufferedDict
from KratosMultiphysics.OptimizationApplication.utilities.helper_utilities import GetAllComponentFullNamesWithData
from KratosMultiphysics.OptimizationApplication.utilities.helper_utilities import GetComponentHavingDataByFullName

Expand Down Expand Up @@ -95,23 +96,23 @@ def __init__(self, parameters: Kratos.Parameters, optimization_problem: Optimiza
raise RuntimeError("The \"bool_values\" should have only two strings corresponding to False and True values in the mentioned order.")

self.list_of_components: 'list[Union[str, ResponseFunction, Control, ExecutionPolicy]]' = []
list_of_component_names = parameters["list_of_output_components"].GetStringArray()
if len(list_of_component_names) == 1 and list_of_component_names[0] == "all":
list_of_component_names = GetAllComponentFullNamesWithData(optimization_problem)

for component_name in list_of_component_names:
self.list_of_components.append(GetComponentHavingDataByFullName(component_name, optimization_problem))

self.list_of_headers: 'list[tuple[Any, dict[str, Header]]]' = []
self.initialized_headers = False
self.list_of_component_names = parameters["list_of_output_components"].GetStringArray()

def IsOutputStep(self) -> bool:
return True

def PrintOutput(self) -> None:
if not self.initialized_headers:
if len(self.list_of_component_names) == 1 and self.list_of_component_names[0] == "all":
self.list_of_component_names = GetAllComponentFullNamesWithData(self.optimization_problem)

for component_name in self.list_of_component_names:
self.list_of_components.append(GetComponentHavingDataByFullName(component_name, self.optimization_problem))

# now get the buffered data headers
self.list_of_headers = self._GetHeaders(lambda x: x.GetBufferedData())
self.list_of_headers = self._GetHeaders(lambda x: x.GetBufferedData() if x.HasDataBuffer() else BufferedDict())
# write the ehader information
self._WriteHeaders()
self.initialized_headers = True
Expand Down Expand Up @@ -168,10 +169,12 @@ def _WriteHeaders(self):
componend_data_view = ComponentDataView(component, self.optimization_problem)
buffered_dict = componend_data_view.GetUnBufferedData()
component_name = componend_data_view.GetComponentName()
msg_header = f"{msg_header}# \t" + component_name + ":\n"
for k, header in header_info_dict.items():
component_name_header = header.GetHeaderName().strip()[len(component_name)+1:]
msg_header = f"{msg_header}# \t\t" + component_name_header + ": " + header.GetValueStr(buffered_dict[k]).strip() + "\n"
# check if there are values to be written under the component name, if not skip the component.
if len(header_info_dict):
msg_header = f"{msg_header}# \t" + component_name + ":\n"
for k, header in header_info_dict.items():
component_name_header = header.GetHeaderName().strip()[len(component_name)+1:]
msg_header = f"{msg_header}# \t\t" + component_name_header + ": " + header.GetValueStr(buffered_dict[k]).strip() + "\n"

msg_header = f"{msg_header}# ------------ End of initial values ------------\n"
msg_header = f"{msg_header}# -----------------------------------------------\n"
Expand Down Expand Up @@ -204,5 +207,6 @@ def _GetHeaders(self, dict_getter_method) -> 'list[tuple[Any, dict[str, Header]
if header_name in [header.GetHeaderName().strip() for header in header_info_dict.values()]:
Kratos.Logger.PrintWarning(self.__class__.__name__, "Second value with same header name = \"" + header_name + "\" found.")
header_info_dict[k] = Header(header_name, v, self.format_info)
list_of_headers.append([component, header_info_dict])
if len(header_info_dict):
list_of_headers.append([component, header_info_dict])
return list_of_headers
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def GetImplementedPhysicalKratosVariables(self) -> 'list[SupportedSensitivityFie

def Initialize(self) -> None:
self.response_function.Initialize()
if not ComponentDataView("evaluated_responses", self.optimization_problem).HasDataBuffer():
ComponentDataView("evaluated_responses", self.optimization_problem).SetDataBuffer(1)

def Check(self) -> None:
self.response_function.Check()
Expand All @@ -33,10 +35,10 @@ def CalculateValue(self) -> float:
# so this creates a new data container under the optimization problem to avoid
# having to compute the same response value twice.

unbuffered_data = ComponentDataView("evaluated_responses", self.optimization_problem).GetUnBufferedData()
buffered_data = ComponentDataView("evaluated_responses", self.optimization_problem).GetBufferedData()

# reset data of the evaluation
self.__ResetEvaluationData(self, unbuffered_data, "values")
self.__ResetEvaluationData(self, buffered_data, "values")

# now calculate
return self.response_function.CalculateValue()
Expand All @@ -60,9 +62,9 @@ def __str__(self) -> str:
return f"Response [type = {self.__class__.__name__}, name = {self.GetName()}]"

@staticmethod
def __ResetEvaluationData(response_function: ResponseFunction, unbuffered_data: BufferedDict, prefix: str) -> None:
if unbuffered_data.HasValue(f"{prefix}/{response_function.GetName()}"):
del unbuffered_data[f"{prefix}/{response_function.GetName()}"]
def __ResetEvaluationData(response_function: ResponseFunction, data: BufferedDict, prefix: str) -> None:
if data.HasValue(f"{prefix}/{response_function.GetName()}"):
del data[f"{prefix}/{response_function.GetName()}"]
for child_response in response_function.GetChildResponses():
# now reset the children
EvaluationResponseFunction.__ResetEvaluationData(child_response, unbuffered_data, prefix)
EvaluationResponseFunction.__ResetEvaluationData(child_response, data, prefix)
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,14 @@ def GetAllComponentFullNamesWithData(optimization_problem: OptimizationProblem)
return list_of_components_full_names_with_data

def GetComponentHavingDataByFullName(component_full_name: str, optimization_problem: OptimizationProblem) -> Any:
data_container = optimization_problem.GetProblemDataContainer()

name_data = component_full_name.split(".")
for component_type in optimization_problem.GetComponentContainer().keys():
snake_case_name = Kratos.StringUtilities.ConvertCamelCaseToSnakeCase(component_type.__name__)
if component_full_name.startswith(snake_case_name):
return optimization_problem.GetComponent(component_full_name[len(snake_case_name) + 1:], component_type)

if len(name_data) == 1:
if data_container.HasValue("object") and data_container["object"].HasValue(component_full_name):
return component_full_name
else:
component_type_str = Kratos.StringUtilities.ConvertSnakeCaseToCamelCase(name_data[0])
component_name = name_data[1]

for component_type in optimization_problem.GetComponentContainer().keys():
if component_type.__name__ == component_type_str:
return optimization_problem.GetComponent(component_name, component_type)
data_container = optimization_problem.GetProblemDataContainer()
if data_container.HasValue("object") and data_container["object"].HasValue(component_full_name):
return component_full_name

msg = ""
for component_type, dict_of_components in optimization_problem.GetComponentContainer().items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ def GetFunction(function_name: str, optimization_problem: OptimizationProblem, *
raise RuntimeError(f"Undefined \"{function_name}\" function. Followings are supported function names:\n\t" + "\n\t".join(functions_map.keys()))

def EvaluateValue(response_function: ResponseFunction, optimization_problem: OptimizationProblem) -> float:
response_data = ComponentDataView("evaluated_responses", optimization_problem).GetUnBufferedData()
response_data = ComponentDataView("evaluated_responses", optimization_problem).GetBufferedData()

if not response_data.HasValue(f"values/{response_function.GetName()}"):
response_data.SetValue(f"values/{response_function.GetName()}", response_function.CalculateValue())
if response_function.GetImplementedPhysicalKratosVariables():
if not response_data.HasValue(f"values/{response_function.GetName()}"):
response_data.SetValue(f"values/{response_function.GetName()}", response_function.CalculateValue())

return response_data.GetValue(f"values/{response_function.GetName()}")
return response_data.GetValue(f"values/{response_function.GetName()}")
else:
return response_function.CalculateValue()

def EvaluateGradient(response_function: ResponseFunction, physical_variable_collective_expressions: 'dict[SupportedSensitivityFieldVariableTypes, KratosOA.CollectiveExpression]', optimization_problem: OptimizationProblem) -> 'dict[SupportedSensitivityFieldVariableTypes, KratosOA.CollectiveExpression]':
# first get the sub_collective expressions for implemented physical kratos variables
Expand Down Expand Up @@ -153,7 +156,8 @@ def GetValuesAndOperators(response_expression: str, optimization_problem: Optimi
index += closing_bracket_position + 2
continue

if current_char in BinaryOperatorValues:
# check for binary operator or scientific notation value
if current_char in BinaryOperatorValues and not (re.match(r"(^[0-9.]+[e|E])$", current_word) and current_char in ["+", "-"]):
responses.append(GetResponseFunction(current_word, optimization_problem))
operators.append(current_char)
current_word = ""
Expand Down Expand Up @@ -229,7 +233,7 @@ def __evaluate_operator(list_of_operators: 'list[str]') -> None:

def EvaluateResponseExpression(response_expression: str, optimization_problem: OptimizationProblem) -> ResponseFunction:
evaluated_response_impl = __EvaluateResponseExpressionImpl(response_expression, optimization_problem)
if not evaluated_response_impl.GetChildResponses() and not evaluated_response_impl.GetImplementedPhysicalKratosVariables():
if evaluated_response_impl.GetChildResponses() and evaluated_response_impl.GetImplementedPhysicalKratosVariables():
# if the response has children and has some dependence on the variables, then
# we need to use the EvaluationResponseFunction to clear the evaluation data
# whenever CalculateValue, CalculateGradient is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Timestamp : not_specified
# -----------------------------------------------
# --------------- Initial values ----------------
# algorithm:
# strain_energy:
# initial_value: 2.045937642e-02
# ------------ End of initial values ------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Timestamp : not_specified
# -----------------------------------------------
# --------------- Initial values ----------------
# algorithm:
# strain_energy:
# initial_value: 2.045937642e-02
# ------------ End of initial values ------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Timestamp : not_specified
# -----------------------------------------------
# --------------- Initial values ----------------
# algorithm:
# strain_energy:
# initial_value: 2.045937642e-02
# ------------ End of initial values ------------
Expand Down
Loading

0 comments on commit 21df993

Please sign in to comment.