-
Notifications
You must be signed in to change notification settings - Fork 0
/
sde.py
115 lines (97 loc) · 5.31 KB
/
sde.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import numpy as np
from scipy.integrate import solve_ivp
from geomstats.geometry.riemannian_metric import RiemannianMetric
from geomstats.geometry.hyperbolic import Hyperbolic
from geomstats.learning.frechet_mean import FrechetMean
class SocietalDynamicsEngine:
def __init__(self, manifold_dim=100, num_variables=50):
self.manifold = Hyperbolic(dim=manifold_dim)
self.metric = self.manifold.metric
self.num_variables = num_variables
self.current_state = self.manifold.random_point()
self.drift_coefficient = 0.1
self.diffusion_coefficient = 0.05
self.temperature = 1.0
self.interaction_strength = 0.1
def set_initial_state(self, initial_state):
assert len(initial_state) == self.manifold.dim
self.current_state = self.manifold.from_vector(initial_state)
def dynamics(self, t, state, noise):
# Implement the drift and diffusion terms of the SDE
drift = self.calculate_drift(state)
diffusion = self.calculate_diffusion(state)
return drift + diffusion * noise
def calculate_drift(self, state):
# Calculate the drift term based on the gradient of the potential energy
gradient = self.metric.grad(self.calculate_energy, state)
return -self.drift_coefficient * gradient
def calculate_diffusion(self, state):
# Calculate the state-dependent diffusion term
return self.diffusion_coefficient * np.sqrt(self.calculate_energy(state))
def evolve_society(self, time_span, time_step):
t_eval = np.arange(0, time_span, time_step)
noise = np.random.normal(0, 1, (len(t_eval), self.manifold.dim))
def ode_func(t, y):
return self.dynamics(t, y, noise[int(t/time_step)])
solution = solve_ivp(ode_func, (0, time_span), self.current_state, t_eval=t_eval, method='RK45')
self.current_state = solution.y[:, -1]
return solution.y.T
def calculate_energy(self, state):
# Calculate the energy of a given societal state using a simplified Ising-like model
energy = 0
for i in range(self.manifold.dim):
for j in range(i+1, self.manifold.dim):
energy -= self.interaction_strength * state[i] * state[j]
return energy
def calculate_probability(self, state):
# Calculate the probability of a state using the Boltzmann distribution
energy = self.calculate_energy(state)
return np.exp(-energy / self.temperature) / self.calculate_partition_function()
def calculate_partition_function(self):
# Approximate the partition function using Monte Carlo integration
num_samples = 10000
samples = [self.manifold.random_point() for _ in range(num_samples)]
energies = [self.calculate_energy(sample) for sample in samples]
return np.mean(np.exp(-np.array(energies) / self.temperature))
def parallel_transport(self, vector, curve):
# Perform parallel transport of a vector along a curve on the manifold
return self.metric.parallel_transport(vector, curve[0], curve[1:], n_steps=100)
def geodesic(self, initial_point, end_point, n_steps=100):
# Compute the geodesic between two points on the manifold
return self.metric.geodesic(initial_point=initial_point, end_point=end_point)(np.linspace(0, 1, n_steps))
def frechet_mean(self, points, weights=None):
# Calculate the Fréchet mean of a set of points on the manifold
mean = FrechetMean(metric=self.metric, point_type='vector')
mean.fit(points, weights=weights)
return mean.estimate_
def curvature(self, point):
# Compute the sectional curvature at a point
return self.manifold.sectional_curvature(point)
def stability_analysis(self, state):
# Perform stability analysis by computing the Jacobian of the drift term
epsilon = 1e-6
jacobian = np.zeros((self.manifold.dim, self.manifold.dim))
for i in range(self.manifold.dim):
perturbed_state = state.copy()
perturbed_state[i] += epsilon
jacobian[:, i] = (self.calculate_drift(perturbed_state) - self.calculate_drift(state)) / epsilon
eigenvalues = np.linalg.eigvals(jacobian)
return eigenvalues
def lyapunov_exponent(self, initial_state, time_span, time_step):
# Estimate the largest Lyapunov exponent
trajectory = self.evolve_society(time_span, time_step)
perturbation = np.random.normal(0, 1e-10, self.manifold.dim)
perturbed_trajectory = self.evolve_society(time_span, time_step, initial_state + perturbation)
distances = np.linalg.norm(trajectory - perturbed_trajectory, axis=1)
return np.mean(np.log(distances[1:] / distances[:-1])) / time_step
def bifurcation_analysis(self, parameter_range, parameter_name):
# Perform bifurcation analysis by varying a parameter
results = []
original_value = getattr(self, parameter_name)
for param_value in parameter_range:
setattr(self, parameter_name, param_value)
steady_state = self.evolve_society(time_span=100, time_step=0.1)[-1]
stability = self.stability_analysis(steady_state)
results.append((param_value, steady_state, stability))
setattr(self, parameter_name, original_value)
return results