-
Notifications
You must be signed in to change notification settings - Fork 2
/
4-redis.py
141 lines (114 loc) · 3.82 KB
/
4-redis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Code samples for vector database quickstart pages:
https://redis.io/docs/latest/develop/get-started/vector-database/
"""
import json
import numpy as np
import redis
import time
from redis.commands.search.field import VectorField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query
from sentence_transformers import SentenceTransformer
from common import read_verses
key_prefix = "verse"
client = redis.Redis(host="localhost", port=6379, password="pass", decode_responses=True)
def create_redis_index(index_name):
try:
# check to see if index exists
client.ft(index_name).info()
print("Index already exists!")
except:
VECTOR_DIMENSION = 768
schema = (
# TextField("$.meta", no_stem=True, as_name="meta"),
# TextField("$.text", as_name="text"),
VectorField(
"embedding",
"HNSW",
{
"TYPE": "FLOAT32",
"DIM": VECTOR_DIMENSION,
"DISTANCE_METRIC": "COSINE",
},
),
)
definition = IndexDefinition(
prefix=[f"{key_prefix}:"], index_type=IndexType.HASH
)
client.ft(index_name).create_index(fields=schema, definition=definition)
def redis_inserts(chunk, pipeline):
# pipeline = client.pipeline()
for _id, text, meta, embedding in chunk:
key = f"{key_prefix}:{_id}"
# Convert embedding from ndarray to byte string
if isinstance(embedding, (np.ndarray, list)):
embedding = np.array(embedding, dtype=np.float32).tobytes()
if not isinstance(meta, str):
meta = json.dumps(meta)
pipeline.hset(
name=key,
mapping={
"text": text,
"meta": meta,
"embedding": embedding,
},
)
start_time = time.perf_counter()
pipeline.execute()
end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f"Insert time: {elapsed_time} sec")
return elapsed_time
def redis_search(embeddings, index_name):
query = (
Query("(*)=>[KNN 10 @embedding $query_vector AS vector_score]")
.sort_by("vector_score")
.paging(0, 10)
.return_fields("vector_score", "id", "text")
.dialect(4)
)
# query = (
# Query(
# "@embedding:[VECTOR_RANGE $range $query_vector]=>"
# "{$YIELD_DISTANCE_AS: vector_score}"
# )
# .sort_by("vector_score")
# .return_fields("vector_score", "id", "text")
# .paging(0, 10)
# .dialect(2)
# )
result_docs = (
client.ft(index_name)
.search(
query,
{"query_vector": np.array(embeddings, dtype=np.float32).tobytes()},
# | ({"range": 0.55}),
)
.docs
)
for doc in result_docs:
vector_score = round(1 - float(doc.vector_score), 2)
print(f"Text: {doc.text}; Similarity: {vector_score}")
# Create Index
index_name = "idx:verse_vss12"
create_redis_index(index_name)
# Ingest Data
with client.pipeline(transaction=False) as pipeline:
read_verses(
redis_inserts, max_items=1400000, minibatch_size=1000, pipeline=pipeline
)
# Run queries
model = SentenceTransformer(
"sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
)
embeddings = model.encode("воскресил из мертвых")
start_time = time.perf_counter()
redis_search(embeddings, index_name)
redis_search(embeddings, index_name)
redis_search(embeddings, index_name)
redis_search(embeddings, index_name)
redis_search(embeddings, index_name)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f"Search time: {elapsed_time/5} sec")