Skip to content

Commit

Permalink
Merge pull request #816 from pariterre/master
Browse files Browse the repository at this point in the history
Final stretch of the reformating!
  • Loading branch information
pariterre authored Jan 11, 2024
2 parents 1030ae7 + 0a6c34e commit 17e3628
Show file tree
Hide file tree
Showing 37 changed files with 471 additions and 731 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/run_tests_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ jobs:
mamba list
- name: Install extra dependencies
run: mamba install pytest-cov black pytest pytest-cov codecov packaging -cconda-forge
run: |
mamba install pytest-cov black pytest pytest-cov codecov packaging -cconda-forge
sudo apt install -y librhash-dev
- name: Install ACADOS on Linux
run: |
Expand Down
9 changes: 1 addition & 8 deletions bioptim/dynamics/configure_problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ def stochastic_torque_driven(
DynamicsFunctions.stochastic_torque_driven,
with_contact=with_contact,
with_friction=with_friction,
allow_free_variables=True,
)

@staticmethod
Expand Down Expand Up @@ -706,7 +705,7 @@ def holonomic_torque_driven(ocp, nlp):
ConfigureProblem.configure_dynamics_function(ocp, nlp, DynamicsFunctions.holonomic_torque_driven)

@staticmethod
def configure_dynamics_function(ocp, nlp, dyn_func, allow_free_variables: bool = False, **extra_params):
def configure_dynamics_function(ocp, nlp, dyn_func, **extra_params):
"""
Configure the dynamics of the system
Expand All @@ -718,12 +717,8 @@ def configure_dynamics_function(ocp, nlp, dyn_func, allow_free_variables: bool =
A reference to the phase
dyn_func: Callable[time, states, controls, param, algebraic_states] | tuple[Callable[time, states, controls, param, algebraic_states], ...]
The function to get the derivative of the states
allow_free_variables: bool
If it is expected the dynamics depends on more than the variable provided by bioptim. It is therefore to the
user prerogative to wrap the Function around another function to lift the free variable
"""

nlp.parameters = ocp.parameters
DynamicsFunctions.apply_parameters(nlp.parameters.mx, nlp)

if not isinstance(dyn_func, (tuple, list)):
Expand Down Expand Up @@ -757,7 +752,6 @@ def configure_dynamics_function(ocp, nlp, dyn_func, allow_free_variables: bool =
[dynamics_dxdt],
["t_span", "x", "u", "p", "a"],
["xdot"],
{"allow_free": allow_free_variables},
),
)

Expand Down Expand Up @@ -790,7 +784,6 @@ def configure_dynamics_function(ocp, nlp, dyn_func, allow_free_variables: bool =
[dynamics_eval.defects],
["t_span", "x", "u", "p", "a", "xdot"],
["defects"],
{"allow_free": allow_free_variables},
)
)
if nlp.dynamics_type.expand_dynamics:
Expand Down
10 changes: 7 additions & 3 deletions bioptim/dynamics/dynamics_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,14 @@ def stochastic_torque_driven(

sensory_input = nlp.model.sensory_reference(states, controls, parameters, algebraic_states, nlp)

mapped_motor_noise = nlp.model.motor_noise_sym_mx
mapped_sensory_feedback_torque = nlp.model.compute_torques_from_noise_and_feedback(k_matrix, sensory_input, ref)
mapped_motor_noise = parameters[nlp.parameters["motor_noise"].index]
mapped_sensory_noise = parameters[nlp.parameters["sensory_noise"].index]
mapped_sensory_feedback_torque = nlp.model.compute_torques_from_noise_and_feedback(
mapped_sensory_noise, k_matrix, sensory_input, ref
)

if "tau" in nlp.model.motor_noise_mapping.keys():
mapped_motor_noise = nlp.model.motor_noise_mapping["tau"].to_second.map(nlp.model.motor_noise_sym_mx)
mapped_motor_noise = nlp.model.motor_noise_mapping["tau"].to_second.map(nlp.parameters["motor_noise"].mx)
mapped_sensory_feedback_torque = nlp.model.motor_noise_mapping["tau"].to_second.map(
mapped_sensory_feedback_torque
)
Expand Down
19 changes: 3 additions & 16 deletions bioptim/dynamics/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def __init__(self, ode: dict, ode_opt: dict):
self.defects_type = ode_opt["defects_type"]
self.control_type = ode_opt["control_type"]
self.function = None
self.allow_free_variables = ode_opt["allow_free_variables"]
self.duplicate_starting_point = ode_opt["duplicate_starting_point"]

# Initialize is expected to set step_time
Expand All @@ -80,13 +79,7 @@ def __init__(self, ode: dict, ode_opt: dict):
self.step_times_from_dt = self._time_xall_from_dt_func
self.function = Function(
"integrator",
[
self.t_span_sym,
self._x_sym_modified,
self.u_sym,
self.param_sym,
self.a_sym,
],
[self.t_span_sym, self._x_sym_modified, self.u_sym, self.param_sym, self.a_sym],
self.dxdt(
states=self.x_sym,
controls=self.u_sym,
Expand All @@ -95,7 +88,6 @@ def __init__(self, ode: dict, ode_opt: dict):
),
self._input_names,
self._output_names,
{"allow_free": self.allow_free_variables},
)

@property
Expand Down Expand Up @@ -505,7 +497,6 @@ def _initialize(self, ode: dict, ode_opt: dict):
"""
self.method = ode_opt["method"]
self.degree = ode_opt["irk_polynomial_interpolation_degree"]
self.allow_free_variables = ode_opt["allow_free_variables"]

# Coefficients of the collocation equation
self._c = self.cx.zeros((self.degree + 1, self.degree + 1))
Expand Down Expand Up @@ -621,13 +612,9 @@ def dxdt(

if self.defects_type == DefectType.EXPLICIT:
f_j = self.fun(
t,
states[j + 1],
self.get_u(controls, self._integration_time[j]),
params,
algebraic_states,
t, states[j + 1], self.get_u(controls, self._integration_time[j]), params, algebraic_states
)[:, self.ode_idx]
defects.append(xp_j - self.h * f_j)
defects.append(xp_j - f_j * self.h)
elif self.defects_type == DefectType.IMPLICIT:
defects.append(
self.implicit_fun(
Expand Down
46 changes: 8 additions & 38 deletions bioptim/dynamics/ode_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ class OdeSolverBase:
Properly set the integration in an nlp
"""

def __init__(self, allow_free_variables: bool = False, duplicate_starting_point: bool = False):
def __init__(self, duplicate_starting_point: bool = False):
"""
Parameters
----------
allow_free_variables: bool
If the free variables are allowed in the integrator's casadi function
duplicate_starting_point: bool
If the starting point should be duplicated in the integrator's casadi function
"""
self.allow_free_variables = allow_free_variables

self.duplicate_starting_point = duplicate_starting_point

@property
Expand Down Expand Up @@ -161,9 +159,7 @@ def param_ode(self, nlp) -> MX:
"""
return nlp.parameters.cx

def initialize_integrator(
self, ocp, nlp, dynamics_index: int, node_index: int, allow_free_variables: bool = False, **extra_opt
) -> Callable:
def initialize_integrator(self, ocp, nlp, dynamics_index: int, node_index: int, **extra_opt) -> Callable:
"""
Initialize the integrator
Expand All @@ -177,8 +173,6 @@ def initialize_integrator(
The current dynamics to resolve (that can be referred to nlp.dynamics_func[index])
node_index
The index of the node currently initialized
allow_free_variables
If the free variables are allowed in the integrator's casadi function
extra_opt
Any extra options to pass to the integrator
Expand All @@ -196,7 +190,6 @@ def initialize_integrator(
"cx": nlp.cx,
"control_type": nlp.control_type,
"defects_type": self.defects_type,
"allow_free_variables": allow_free_variables,
"param_scaling": vertcat(*[nlp.parameters[key].scaling.scaling for key in nlp.parameters.keys()]),
"ode_index": node_index if nlp.dynamics_func[dynamics_index].size2_out("xdot") > 1 else 0,
"duplicate_starting_point": self.duplicate_starting_point,
Expand Down Expand Up @@ -231,43 +224,23 @@ def prepare_dynamic_integrator(self, ocp, nlp):
"""

# Primary dynamics
dynamics = [
nlp.ode_solver.initialize_integrator(
ocp, nlp, dynamics_index=0, node_index=0, allow_free_variables=self.allow_free_variables
)
]
dynamics = [nlp.ode_solver.initialize_integrator(ocp, nlp, dynamics_index=0, node_index=0)]
if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE:
dynamics = dynamics * nlp.ns
else:
for node_index in range(1, nlp.ns):
dynamics.append(
nlp.ode_solver.initialize_integrator(
ocp,
nlp,
dynamics_index=0,
node_index=node_index,
allow_free_variables=self.allow_free_variables,
)
)
dynamics.append(nlp.ode_solver.initialize_integrator(ocp, nlp, dynamics_index=0, node_index=node_index))
nlp.dynamics = dynamics

# Extra dynamics
extra_dynamics = []
for i in range(1, len(nlp.dynamics_func)):
extra_dynamics += [
nlp.ode_solver.initialize_integrator(
ocp, nlp, dynamics_index=i, node_index=0, allow_free_variables=True
)
]
extra_dynamics += [nlp.ode_solver.initialize_integrator(ocp, nlp, dynamics_index=i, node_index=0)]
if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE:
extra_dynamics = extra_dynamics * nlp.ns
else:
for node_index in range(1, nlp.ns):
extra_dynamics += [
nlp.ode_solver.initialize_integrator(
ocp, nlp, dynamics_index=i, node_index=0, allow_free_variables=True
)
]
extra_dynamics += [nlp.ode_solver.initialize_integrator(ocp, nlp, dynamics_index=i, node_index=0)]
# TODO include this in nlp.dynamics so the index of nlp.dynamics_func and nlp.dynamics match
nlp.extra_dynamics.append(extra_dynamics)

Expand Down Expand Up @@ -564,9 +537,7 @@ def p_ode(self, nlp):
def a_ode(self, nlp):
return nlp.algebraic_states.scaled.cx

def initialize_integrator(
self, ocp, nlp, dynamics_index: int, node_index: int, allow_free_variables: bool = False, **extra_opt
):
def initialize_integrator(self, ocp, nlp, dynamics_index: int, node_index: int, **extra_opt):
raise NotImplementedError("CVODES is not yet implemented")

if extra_opt:
Expand Down Expand Up @@ -610,7 +581,6 @@ def initialize_integrator(
),
["t_span", "x0", "u", "p", "a"],
["xf", "xall"],
{"allow_free": allow_free_variables},
)
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def stochastic_forward_dynamics(
motor_noise = 0
sensory_noise = 0
if with_noise:
motor_noise = nlp.model.motor_noise_sym_mx
sensory_noise = nlp.model.sensory_noise_sym_mx
motor_noise = nlp.parameters["motor_noise"].mx
sensory_noise = nlp.parameters["sensory_noise"].mx

mus_excitations_fb = mus_excitations
noise_torque = np.zeros(nlp.model.motor_noise_magnitude.shape)
Expand Down Expand Up @@ -156,20 +156,20 @@ def configure_stochastic_optimal_control_problem(ocp: OptimalControlProgram, nlp
dyn_func=lambda time, states, controls, parameters, algebraic_states, nlp: nlp.dynamics_type.dynamic_function(
time, states, controls, parameters, algebraic_states, nlp, with_noise=True
),
allow_free_variables=True,
)


def minimize_uncertainty(controllers: list[PenaltyController], key: str) -> cas.MX:
"""
Minimize the uncertainty (covariance matrix) of the states.
"""
dt = controllers[0].dt
dt = controllers[0].dt.cx
out = 0
for i, ctrl in enumerate(controllers):
cov_matrix = StochasticBioModel.reshape_to_matrix(ctrl.integrated_values["cov"].cx, ctrl.model.matrix_shape_cov)
p_partial = cov_matrix[ctrl.states[key].index, ctrl.states[key].index]
out += cas.trace(p_partial) * dt

return out


Expand All @@ -184,68 +184,48 @@ def get_cov_mat(nlp, node_index):
m_matrix = StochasticBioModel.reshape_to_matrix(nlp.algebraic_states["m"].cx, nlp.model.matrix_shape_m)

CX_eye = cas.SX_eye if nlp.cx == cas.SX else cas.MX_eye
sigma_w = cas.vertcat(nlp.model.sensory_noise_sym, nlp.model.motor_noise_sym) * CX_eye(6)
sensory_noise = nlp.parameters["sensory_noise"].cx
motor_noise = nlp.parameters["motor_noise"].cx
sigma_w = cas.vertcat(sensory_noise, motor_noise) * CX_eye(6)
cov_sym = cas.MX.sym("cov", nlp.integrated_values.cx.shape[0])
cov_matrix = StochasticBioModel.reshape_to_matrix(cov_sym, nlp.model.matrix_shape_cov)

dx = stochastic_forward_dynamics(
nlp.states.mx,
nlp.controls.mx,
nlp.parameters,
nlp.parameters.mx,
nlp.algebraic_states.mx,
nlp,
force_field_magnitude=nlp.model.force_field_magnitude,
with_noise=True,
)

dx.dxdt = cas.Function(
"tp",
[
nlp.states.mx,
nlp.controls.mx,
nlp.parameters,
nlp.algebraic_states.mx,
nlp.model.sensory_noise_sym_mx,
nlp.model.motor_noise_sym_mx,
],
[dx.dxdt],
)(
nlp.states.cx,
nlp.controls.cx,
nlp.parameters,
nlp.algebraic_states.cx,
nlp.model.sensory_noise_sym,
nlp.model.motor_noise_sym,
)
"tp", [nlp.states.mx, nlp.controls.mx, nlp.parameters.mx, nlp.algebraic_states.mx], [dx.dxdt]
)(nlp.states.cx, nlp.controls.cx, nlp.parameters.cx, nlp.algebraic_states.cx)

ddx_dwm = cas.jacobian(dx.dxdt, cas.vertcat(nlp.model.sensory_noise_sym, nlp.model.motor_noise_sym))
ddx_dwm = cas.jacobian(dx.dxdt, cas.vertcat(sensory_noise, motor_noise))
dg_dw = -ddx_dwm * dt
ddx_dx = cas.jacobian(dx.dxdt, nlp.states.cx)
dg_dx = -(ddx_dx * dt / 2 + CX_eye(ddx_dx.shape[0]))

p_next = m_matrix @ (dg_dx @ cov_matrix @ dg_dx.T + dg_dw @ sigma_w @ dg_dw.T) @ m_matrix.T

parameters = nlp.parameters.cx
parameters[nlp.parameters["sensory_noise"].index] = nlp.model.sensory_noise_magnitude
parameters[nlp.parameters["motor_noise"].index] = nlp.model.motor_noise_magnitude

func_eval = cas.Function(
"p_next",
[
dt,
nlp.states.cx,
nlp.controls.cx,
nlp.parameters,
nlp.algebraic_states.cx,
cov_sym,
nlp.model.motor_noise_sym,
nlp.model.sensory_noise_sym,
],
[dt, nlp.states.cx, nlp.controls.cx, nlp.parameters.cx, nlp.algebraic_states.cx, cov_sym],
[p_next],
)(
nlp.dt,
nlp.states.cx,
nlp.controls.cx,
nlp.parameters,
parameters,
nlp.algebraic_states.cx,
nlp.integrated_values["cov"].cx,
nlp.model.motor_noise_magnitude,
nlp.model.sensory_noise_magnitude,
)
p_vector = StochasticBioModel.reshape_to_vector(func_eval)
return p_vector
Expand Down Expand Up @@ -404,7 +384,6 @@ def prepare_socp(
sensory_noise_magnitude=sensory_noise_magnitude,
motor_noise_magnitude=motor_noise_magnitude,
sensory_reference=sensory_reference,
use_sx=use_sx,
)
bio_model.force_field_magnitude = force_field_magnitude

Expand Down Expand Up @@ -587,7 +566,7 @@ def prepare_socp(
max_bound=stochastic_max[curent_index : curent_index + n_states * n_states, :],
)

integrated_value_functions = {"cov": lambda nlp, node_index: get_cov_mat(nlp, node_index)}
integrated_value_functions = {"cov": get_cov_mat}

return StochasticOptimalControlProgram(
bio_model,
Expand Down Expand Up @@ -722,8 +701,6 @@ def main():
parameters = socp.nlp[0].parameters.cx
algebraic_states = socp.nlp[0].algebraic_states.cx
nlp = socp.nlp[0]
motor_noise_sym = cas.MX.sym("motor_noise", 2, 1)
sensory_noise_sym = cas.MX.sym("sensory_noise", 4, 1)
out = stochastic_forward_dynamics(
states,
controls,
Expand All @@ -733,11 +710,7 @@ def main():
force_field_magnitude=force_field_magnitude,
with_noise=True,
)
dyn_fun = cas.Function(
"dyn_fun",
[states, controls, parameters, algebraic_states, motor_noise_sym, sensory_noise_sym],
[out.dxdt],
)
dyn_fun = cas.Function("dyn_fun", [states, controls, parameters, algebraic_states], [out.dxdt])

fig, axs = plt.subplots(3, 2)
n_simulations = 30
Expand Down
Loading

0 comments on commit 17e3628

Please sign in to comment.