forked from xinyandai/product-quantization
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpq.py
62 lines (49 loc) · 2.26 KB
/
pq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from __future__ import division
from __future__ import print_function
import numpy as np
from quantize import vq, kmeans2
class PQ(object):
def __init__(self, M, Ks=256, verbose=True, mahalanobis_matrix=None):
assert 0 < Ks <= 2 ** 32
self.M, self.Ks, self.verbose, self.mahalanobis_matrix = M, Ks, verbose, mahalanobis_matrix
self.code_dtype = np.uint8 if Ks <= 2 ** 8 else (np.uint16 if Ks <= 2 ** 16 else np.uint32)
self.codewords = None
self.Ds = None
def fit(self, vecs, iter=20, seed=123):
assert vecs.dtype == np.float32
assert vecs.ndim == 2
N, D = vecs.shape
assert self.Ks < N, "the number of training vector should be more than Ks"
assert D % self.M == 0, "input dimension must be dividable by M"
self.Ds = int(D / self.M)
np.random.seed(seed)
# [m][ks][ds]: m-th subspace, ks-the codeword, ds-th dim
self.codewords = np.zeros((self.M, self.Ks, self.Ds), dtype=np.float32)
for m in range(self.M):
if self.verbose:
print(" Training the subspace: {} / {}".format(m, self.M))
vecs_sub = vecs[:, m * self.Ds : (m+1) * self.Ds]
self.codewords[m], _ = kmeans2(vecs_sub, self.Ks, iter=iter, minit='points', matrix=self.mahalanobis_matrix)
return self
def encode(self, vecs):
assert vecs.dtype == np.float32
assert vecs.ndim == 2
N, D = vecs.shape
assert D == self.Ds * self.M, "input dimension must be Ds * M"
# codes[n][m] : code of n-th vec, m-th subspace
codes = np.empty((N, self.M), dtype=self.code_dtype)
for m in range(self.M):
vecs_sub = vecs[:, m * self.Ds : (m+1) * self.Ds]
codes[:, m], _ = vq(vecs_sub, self.codewords[m], matrix=self.mahalanobis_matrix)
return codes
def decode(self, codes):
assert codes.ndim == 2
N, M = codes.shape
assert M == self.M
assert codes.dtype == self.code_dtype
vecs = np.empty((N, self.Ds * self.M), dtype=np.float32)
for m in range(self.M):
vecs[:, m * self.Ds:(m+1) * self.Ds] = self.codewords[m][codes[:, m], :]
return vecs
def compress(self, vecs):
return self.decode(self.encode(vecs))