-
Notifications
You must be signed in to change notification settings - Fork 8
/
bpe.py
148 lines (133 loc) · 5.46 KB
/
bpe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import warnings
import numpy as np
mm = np.matmul
def vec(S):
"""Per scs definition, preserves inner product
tr(A @ B) = vec(A) @ vec(B)
https://www.cvxgrp.org/scs/api/cones.html#sdcone
"""
j, i = np.triu_indices(S.shape[-1])
scale = np.where(i == j, 1.0, np.sqrt(2))
return S[..., i, j] * scale
def mat(s, n=None):
"""Inverse of vec() operation"""
if n is None:
ls = s.shape[-1]
n = int(0.5 * (np.sqrt(1 + 8 * ls) - 1))
S = np.empty(s.shape[:-1] + (n, n))
j, i = np.triu_indices(n)
scaled = np.where(i == j, 1.0, np.sqrt(0.5)) * s
S[..., i, j] = scaled
S[..., j, i] = scaled
return S
class BasisPointExpansion:
def __init__(self, n_coef):
self._n = n_coef
self._points = []
self._yields = []
self._errors = []
self._M = None
@property
def n(self):
return self._n
def add_point(self, coef, yields, errors):
if not all(isinstance(x, np.ndarray) for x in (coef, yields, errors)):
raise ValueError("Wrong input type")
if coef.shape != (self.n,):
raise ValueError("coef has wrong shape")
if yields.shape != errors.shape or len(yields.shape) != 1:
raise ValueError("yields or errors has wrong shape")
if np.any(yields < 0.0):
raise ValueError("some yields are less than zero")
if np.any(errors == 0.0):
raise ValueError("some errors are zero")
if self._yields and self._yields[0].shape != yields.shape:
raise ValueError("yields shape does not match that of other basis points")
self._points.append(coef)
self._yields.append(yields)
self._errors.append(errors)
self._M = None
def solve(self, algo="svd", tol=1e-5):
"""Solve for expansion matrix
Algo options:
svd: Weighted least-squares fit
svd_pos: Like svd but adjusts eigenvalues to be pos-def
dcp: Use disciplined convex programming to find least-squares under pos-def constraint
tol: Target for how far lowest eigenvalue is above zero
"""
points = np.stack(self._points)
npt, n = points.shape
assert n == self._n
if npt < n * (n - 1):
raise RuntimeError("Not enough points for the dimension of the problem")
yields = np.stack(self._yields).T
weight = 1.0 / np.stack(self._errors).T
nbin, _ = yields.shape
if algo.startswith("svd"):
C = vec(points[:, None, :] * points[:, :, None])[None] * weight[:, :, None]
u, s, vh = np.linalg.svd(C, full_matrices=False)
if np.any(s) == 0.0:
raise RuntimeError("Expansion points not linearly independent")
inv = np.einsum("pji,pj,pkj,pk,pk->pi", vh, 1.0 / s, u, weight, yields)
M = mat(inv, n=n)
if algo == "svd_pos":
eig, eigv = np.linalg.eigh(M)
eigp = np.maximum(eig, tol) # magic happens here
M = mm(eigv * eigp[:, None, :], np.swapaxes(eigv, 1, 2))
self._M = M
elif algo == "dcp":
import cvxpy as cp
Mall = np.empty((nbin, n, n))
M = cp.Variable((n, n), PSD=True)
for i in range(nbin):
c = points * np.sqrt(weight[i, :, None])
y = yields[i] * weight[i]
obj = cp.Minimize(
sum((cp.quad_form(c[j], M) - y[j]) ** 2 for j in range(npt))
)
prob = cp.Problem(obj, [M >> tol])
rss = prob.solve()
if prob.status == cp.OPTIMAL:
pass
elif prob.status == cp.OPTIMAL_INACCURATE:
warnings.warn("Inaccurate solution for bin {i} (rss={rss})".format(i=i, rss=rss))
else:
raise RuntimeError(
"Unable to solve problem for bin {i} (rss={rss}, status={status})".format(i=i, rss=rss, status=prob.status)
)
Mall[i] = M.value
self._M = Mall
elif algo == "scs":
import scipy.sparse
import scs
Mall = np.empty((nbin, n, n))
k = n * (n + 1) // 2
data = dict(
A=-scipy.sparse.csc_matrix(np.eye(k)),
b=np.zeros(k),
)
cone = dict(s=n)
for i in range(nbin):
cw = points * np.sqrt(weight[i, :, None])
yw = yields[i] * weight[i]
D = vec(cw[:, None, :] * cw[:, :, None])
data["P"] = scipy.sparse.csc_matrix(mm(D.T, D))
data["c"] = -mm(yw, D)
ywsq = max(mm(yw, yw), 1.0)
sol = scs.solve(
data, cone, verbose=False, eps_abs=tol, eps_rel=min(tol, 1 / ywsq)
)
M = mat(sol["s"], n=n)
if sol["info"]["status_val"] == 1:
pass
else:
rss = mm(mm(mm(sol["s"], D.T), D), sol["s"]) - 2 * mm(mm(yw, D), sol["s"]) + ywsq
raise RuntimeError(
"Unable to solve problem for bin {i} (rss={rss}, status={status})".format(i=i, rss=rss, status=sol['info'])
)
Mall[i] = M
self._M = Mall
def __call__(self, c):
if self._M is None:
raise RuntimeError("Please call solve() first")
return mm(mm(c, self._M), c)