Skip to content

Commit

Permalink
[CLN] Restructure EmbeddingRecord -> LogRecord. Use 'log_offset' as t…
Browse files Browse the repository at this point in the history
…erm instead of 'id' (#1934)

## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- This PR follows #1933 and restructures EmbeddingRecord to be a nested
type around OperationRecord. We rename EmbeddingRecord to be LogRecord.
A LogRecord is a model of an OperationRecord stored on a log, and just
keeps a log_offset The log is assumed to be per-collection.
- Rename the Logservices use of "ID" to instead be "log_offset" to
standardize on the "log_offset" terminology.
- Rename RecordLog -> LogRecord for conceptual clarity and to align with
other types.
- Fixes a proto style guide violation where we used camelcase for
fieldnames -
https://protobuf.dev/programming-guides/style/#message_and_field_names.
- In GetColllectionCompactionInfo rename first_log_id to
first_log_offset - which is much clearer. Also rename first_log_id_ts to
first_log_offset. What is a id_ts? Confusing name.
 - New functionality
	 - None

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
HammadB authored Mar 28, 2024
1 parent 1ce93c7 commit 5f3f141
Show file tree
Hide file tree
Showing 71 changed files with 5,632 additions and 546 deletions.
45 changes: 23 additions & 22 deletions chromadb/db/mixins/embeddings_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
)
from chromadb.types import (
OperationRecord,
EmbeddingRecord,
LogRecord,
ScalarEncoding,
SeqId,
Operation,
Expand Down Expand Up @@ -188,14 +188,15 @@ def submit_embeddings(
submit_embedding_record = embeddings[id_to_idx[id]]
# We allow notifying consumers out of order relative to one call to
# submit_embeddings so we do not reorder the records before submitting them
embedding_record = EmbeddingRecord(
id=id,
seq_id=seq_id,
embedding=submit_embedding_record["embedding"],
encoding=submit_embedding_record["encoding"],
metadata=submit_embedding_record["metadata"],
operation=submit_embedding_record["operation"],
collection_id=collection_id,
embedding_record = LogRecord(
log_offset=seq_id,
operation_record=OperationRecord(
id=id,
embedding=submit_embedding_record["embedding"],
encoding=submit_embedding_record["encoding"],
metadata=submit_embedding_record["metadata"],
operation=submit_embedding_record["operation"],
),
)
embedding_records.append(embedding_record)
self._notify_all(topic_name, embedding_records)
Expand Down Expand Up @@ -318,13 +319,15 @@ def _backfill(self, subscription: Subscription) -> None:
self._notify_one(
subscription,
[
EmbeddingRecord(
seq_id=row[0],
operation=_operation_codes_inv[row[1]],
id=row[2],
embedding=vector,
encoding=encoding,
metadata=json.loads(row[5]) if row[5] else None,
LogRecord(
log_offset=row[0],
operation_record=OperationRecord(
operation=_operation_codes_inv[row[1]],
id=row[2],
embedding=vector,
encoding=encoding,
metadata=json.loads(row[5]) if row[5] else None,
),
)
],
)
Expand Down Expand Up @@ -353,24 +356,22 @@ def _next_seq_id(self) -> int:
return int(cur.fetchone()[0]) + 1

@trace_method("SqlEmbeddingsQueue._notify_all", OpenTelemetryGranularity.ALL)
def _notify_all(self, topic: str, embeddings: Sequence[EmbeddingRecord]) -> None:
def _notify_all(self, topic: str, embeddings: Sequence[LogRecord]) -> None:
"""Send a notification to each subscriber of the given topic."""
if self._running:
for sub in self._subscriptions[topic]:
self._notify_one(sub, embeddings)

@trace_method("SqlEmbeddingsQueue._notify_one", OpenTelemetryGranularity.ALL)
def _notify_one(
self, sub: Subscription, embeddings: Sequence[EmbeddingRecord]
) -> None:
def _notify_one(self, sub: Subscription, embeddings: Sequence[LogRecord]) -> None:
"""Send a notification to a single subscriber."""
# Filter out any embeddings that are not in the subscription range
should_unsubscribe = False
filtered_embeddings = []
for embedding in embeddings:
if embedding["seq_id"] <= sub.start:
if embedding["log_offset"] <= sub.start:
continue
if embedding["seq_id"] > sub.end:
if embedding["log_offset"] > sub.end:
should_unsubscribe = True
break
filtered_embeddings.append(embedding)
Expand Down
4 changes: 2 additions & 2 deletions chromadb/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Callable, Optional, Sequence
from chromadb.types import (
OperationRecord,
EmbeddingRecord,
LogRecord,
SeqId,
Vector,
ScalarEncoding,
Expand Down Expand Up @@ -67,7 +67,7 @@ def max_batch_size(self) -> int:
pass


ConsumerCallbackFn = Callable[[Sequence[EmbeddingRecord]], None]
ConsumerCallbackFn = Callable[[Sequence[LogRecord]], None]


class Consumer(Component):
Expand Down
4 changes: 2 additions & 2 deletions chromadb/logservice/logservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
ConsumerCallbackFn,
)
from chromadb.proto.convert import to_proto_submit
from chromadb.proto.logservice_pb2 import PushLogsRequest, PullLogsRequest, RecordLog
from chromadb.proto.logservice_pb2 import PushLogsRequest, PullLogsRequest, LogRecord
from chromadb.proto.logservice_pb2_grpc import LogServiceStub
from chromadb.telemetry.opentelemetry.grpc import OtelInterceptor
from chromadb.types import (
Expand Down Expand Up @@ -149,7 +149,7 @@ def push_logs(self, collection_id: UUID, records: Sequence[OperationRecord]) ->

def pull_logs(
self, collection_id: UUID, start_id: int, batch_size: int
) -> Sequence[RecordLog]:
) -> Sequence[LogRecord]:
request = PullLogsRequest(
collection_id=str(collection_id),
start_from_id=start_id,
Expand Down
72 changes: 36 additions & 36 deletions chromadb/proto/chroma_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions chromadb/proto/chroma_pb2.pyi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5f3f141

Please sign in to comment.