From b8dfc5b0f35a20f5049b1981d2885da2bae58242 Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Tue, 26 Mar 2024 15:44:39 -0700 Subject: [PATCH] [CLN] Remove pulsar from python codebase (#1932) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Removes pulsar from the python codebase - New functionality - None ## Test plan *How are these changes tested?* - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None --- chromadb/config.py | 4 - chromadb/ingest/impl/pulsar.py | 317 ------------------ chromadb/ingest/impl/pulsar_admin.py | 81 ----- chromadb/ingest/impl/utils.py | 4 - chromadb/segment/impl/distributed/server.py | 11 - chromadb/test/db/test_system.py | 10 +- .../test/ingest/test_producer_consumer.py | 22 +- chromadb/test/utils/test_messagid.py | 86 +---- chromadb/types.py | 6 +- chromadb/utils/messageid.py | 72 ---- pyproject.toml | 1 - requirements.txt | 3 +- 12 files changed, 17 insertions(+), 600 deletions(-) delete mode 100644 chromadb/ingest/impl/pulsar.py delete mode 100644 chromadb/ingest/impl/pulsar_admin.py diff --git a/chromadb/config.py b/chromadb/config.py index d0e6e45a00f..597a338c814 100644 --- a/chromadb/config.py +++ b/chromadb/config.py @@ -146,10 +146,6 @@ def empty_str_to_none(cls, v: str) -> Optional[str]: chroma_server_nofile: Optional[int] = None - pulsar_broker_url: Optional[str] = None - pulsar_admin_port: Optional[int] = 8080 - pulsar_broker_port: Optional[int] = 6650 - chroma_server_auth_provider: Optional[str] = None @validator("chroma_server_auth_provider", pre=True, always=True, allow_reuse=True) diff --git a/chromadb/ingest/impl/pulsar.py b/chromadb/ingest/impl/pulsar.py deleted file mode 100644 index d84cadfa01e..00000000000 --- a/chromadb/ingest/impl/pulsar.py +++ /dev/null @@ -1,317 +0,0 @@ -from __future__ import annotations -from collections import defaultdict -from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple -import uuid -from chromadb.config import Settings, System -from chromadb.ingest import Consumer, ConsumerCallbackFn, Producer -from overrides import overrides, EnforceOverrides -from uuid import UUID -from chromadb.ingest.impl.pulsar_admin import PulsarAdmin -from chromadb.ingest.impl.utils import create_pulsar_connection_str -from chromadb.proto.convert import from_proto_submit, to_proto_submit -import chromadb.proto.chroma_pb2 as proto -from chromadb.telemetry.opentelemetry import ( - OpenTelemetryClient, - OpenTelemetryGranularity, - trace_method, -) -from chromadb.types import SeqId, SubmitEmbeddingRecord -import pulsar -from concurrent.futures import wait, Future - -from chromadb.utils.messageid import int_to_pulsar, pulsar_to_int - - -class PulsarProducer(Producer, EnforceOverrides): - # TODO: ensure trace context propagates - _connection_str: str - _topic_to_producer: Dict[str, pulsar.Producer] - _opentelemetry_client: OpenTelemetryClient - _client: pulsar.Client - _admin: PulsarAdmin - _settings: Settings - - def __init__(self, system: System) -> None: - pulsar_host = system.settings.require("pulsar_broker_url") - pulsar_port = system.settings.require("pulsar_broker_port") - self._connection_str = create_pulsar_connection_str(pulsar_host, pulsar_port) - self._topic_to_producer = {} - self._settings = system.settings - self._admin = PulsarAdmin(system) - self._opentelemetry_client = system.require(OpenTelemetryClient) - super().__init__(system) - - @overrides - def start(self) -> None: - self._client = pulsar.Client(self._connection_str) - super().start() - - @overrides - def stop(self) -> None: - self._client.close() - super().stop() - - @overrides - def create_topic(self, topic_name: str) -> None: - self._admin.create_topic(topic_name) - - @overrides - def delete_topic(self, topic_name: str) -> None: - self._admin.delete_topic(topic_name) - - @trace_method("PulsarProducer.submit_embedding", OpenTelemetryGranularity.ALL) - @overrides - def submit_embedding( - self, topic_name: str, embedding: SubmitEmbeddingRecord - ) -> SeqId: - """Add an embedding record to the given topic. Returns the SeqID of the record.""" - producer = self._get_or_create_producer(topic_name) - proto_submit: proto.SubmitEmbeddingRecord = to_proto_submit(embedding) - # TODO: batch performance / async - msg_id: pulsar.MessageId = producer.send(proto_submit.SerializeToString()) - return pulsar_to_int(msg_id) - - @trace_method("PulsarProducer.submit_embeddings", OpenTelemetryGranularity.ALL) - @overrides - def submit_embeddings( - self, topic_name: str, embeddings: Sequence[SubmitEmbeddingRecord] - ) -> Sequence[SeqId]: - if not self._running: - raise RuntimeError("Component not running") - - if len(embeddings) == 0: - return [] - - if len(embeddings) > self.max_batch_size: - raise ValueError( - f""" - Cannot submit more than {self.max_batch_size:,} embeddings at once. - Please submit your embeddings in batches of size - {self.max_batch_size:,} or less. - """ - ) - - producer = self._get_or_create_producer(topic_name) - protos_to_submit = [to_proto_submit(embedding) for embedding in embeddings] - - def create_producer_callback( - future: Future[int], - ) -> Callable[[Any, pulsar.MessageId], None]: - def producer_callback(res: Any, msg_id: pulsar.MessageId) -> None: - if msg_id: - future.set_result(pulsar_to_int(msg_id)) - else: - future.set_exception( - Exception( - "Unknown error while submitting embedding in producer_callback" - ) - ) - - return producer_callback - - futures = [] - for proto_to_submit in protos_to_submit: - future: Future[int] = Future() - producer.send_async( - proto_to_submit.SerializeToString(), - callback=create_producer_callback(future), - ) - futures.append(future) - - wait(futures) - - results: List[SeqId] = [] - for future in futures: - exception = future.exception() - if exception is not None: - raise exception - results.append(future.result()) - - return results - - @property - @overrides - def max_batch_size(self) -> int: - # For now, we use 1,000 - # TODO: tune this to a reasonable value by default - return 1000 - - def _get_or_create_producer(self, topic_name: str) -> pulsar.Producer: - if topic_name not in self._topic_to_producer: - producer = self._client.create_producer(topic_name) - self._topic_to_producer[topic_name] = producer - return self._topic_to_producer[topic_name] - - @overrides - def reset_state(self) -> None: - if not self._settings.require("allow_reset"): - raise ValueError( - "Resetting the database is not allowed. Set `allow_reset` to true in the config in tests or other non-production environments where reset should be permitted." - ) - for topic_name in self._topic_to_producer: - self._admin.delete_topic(topic_name) - self._topic_to_producer = {} - super().reset_state() - - -class PulsarConsumer(Consumer, EnforceOverrides): - class PulsarSubscription: - id: UUID - topic_name: str - start: int - end: int - callback: ConsumerCallbackFn - consumer: pulsar.Consumer - - def __init__( - self, - id: UUID, - topic_name: str, - start: int, - end: int, - callback: ConsumerCallbackFn, - consumer: pulsar.Consumer, - ): - self.id = id - self.topic_name = topic_name - self.start = start - self.end = end - self.callback = callback - self.consumer = consumer - - _connection_str: str - _client: pulsar.Client - _opentelemetry_client: OpenTelemetryClient - _subscriptions: Dict[str, Set[PulsarSubscription]] - _settings: Settings - - def __init__(self, system: System) -> None: - pulsar_host = system.settings.require("pulsar_broker_url") - pulsar_port = system.settings.require("pulsar_broker_port") - self._connection_str = create_pulsar_connection_str(pulsar_host, pulsar_port) - self._subscriptions = defaultdict(set) - self._settings = system.settings - self._opentelemetry_client = system.require(OpenTelemetryClient) - super().__init__(system) - - @overrides - def start(self) -> None: - self._client = pulsar.Client(self._connection_str) - super().start() - - @overrides - def stop(self) -> None: - self._client.close() - super().stop() - - @trace_method("PulsarConsumer.subscribe", OpenTelemetryGranularity.ALL) - @overrides - def subscribe( - self, - topic_name: str, - consume_fn: ConsumerCallbackFn, - start: Optional[SeqId] = None, - end: Optional[SeqId] = None, - id: Optional[UUID] = None, - ) -> UUID: - """Register a function that will be called to recieve embeddings for a given - topic. The given function may be called any number of times, with any number of - records, and may be called concurrently. - - Only records between start (exclusive) and end (inclusive) SeqIDs will be - returned. If start is None, the first record returned will be the next record - generated, not including those generated before creating the subscription. If - end is None, the consumer will consume indefinitely, otherwise it will - automatically be unsubscribed when the end SeqID is reached. - - If the function throws an exception, the function may be called again with the - same or different records. - - Takes an optional UUID as a unique subscription ID. If no ID is provided, a new - ID will be generated and returned.""" - if not self._running: - raise RuntimeError("Consumer must be started before subscribing") - - subscription_id = ( - id or uuid.uuid4() - ) # TODO: this should really be created by the coordinator and stored in sysdb - - start, end = self._validate_range(start, end) - - def wrap_callback(consumer: pulsar.Consumer, message: pulsar.Message) -> None: - msg_data = message.data() - msg_id = pulsar_to_int(message.message_id()) - submit_embedding_record = proto.SubmitEmbeddingRecord() - proto.SubmitEmbeddingRecord.ParseFromString( - submit_embedding_record, msg_data - ) - embedding_record = from_proto_submit(submit_embedding_record, msg_id) - consume_fn([embedding_record]) - consumer.acknowledge(message) - if msg_id == end: - self.unsubscribe(subscription_id) - - consumer = self._client.subscribe( - topic_name, - subscription_id.hex, - message_listener=wrap_callback, - ) - - subscription = self.PulsarSubscription( - subscription_id, topic_name, start, end, consume_fn, consumer - ) - self._subscriptions[topic_name].add(subscription) - - # NOTE: For some reason the seek() method expects a shadowed MessageId type - # which resides in _msg_id. - consumer.seek(int_to_pulsar(start)._msg_id) - - return subscription_id - - def _validate_range( - self, start: Optional[SeqId], end: Optional[SeqId] - ) -> Tuple[int, int]: - """Validate and normalize the start and end SeqIDs for a subscription using this - impl.""" - start = start or pulsar_to_int(pulsar.MessageId.latest) - end = end or self.max_seqid() - if not isinstance(start, int) or not isinstance(end, int): - raise TypeError("SeqIDs must be integers") - if start >= end: - raise ValueError(f"Invalid SeqID range: {start} to {end}") - return start, end - - @overrides - def unsubscribe(self, subscription_id: UUID) -> None: - """Unregister a subscription. The consume function will no longer be invoked, - and resources associated with the subscription will be released.""" - for topic_name, subscriptions in self._subscriptions.items(): - for subscription in subscriptions: - if subscription.id == subscription_id: - subscription.consumer.close() - subscriptions.remove(subscription) - if len(subscriptions) == 0: - del self._subscriptions[topic_name] - return - - @overrides - def min_seqid(self) -> SeqId: - """Return the minimum possible SeqID in this implementation.""" - return pulsar_to_int(pulsar.MessageId.earliest) - - @overrides - def max_seqid(self) -> SeqId: - """Return the maximum possible SeqID in this implementation.""" - return 2**192 - 1 - - @overrides - def reset_state(self) -> None: - if not self._settings.require("allow_reset"): - raise ValueError( - "Resetting the database is not allowed. Set `allow_reset` to true in the config in tests or other non-production environments where reset should be permitted." - ) - for topic_name, subscriptions in self._subscriptions.items(): - for subscription in subscriptions: - subscription.consumer.close() - self._subscriptions = defaultdict(set) - super().reset_state() diff --git a/chromadb/ingest/impl/pulsar_admin.py b/chromadb/ingest/impl/pulsar_admin.py deleted file mode 100644 index e031e4a238b..00000000000 --- a/chromadb/ingest/impl/pulsar_admin.py +++ /dev/null @@ -1,81 +0,0 @@ -# A thin wrapper around the pulsar admin api -import requests -from chromadb.config import System -from chromadb.ingest.impl.utils import parse_topic_name - - -class PulsarAdmin: - """A thin wrapper around the pulsar admin api, only used for interim development towards distributed chroma. - This functionality will be moved to the chroma coordinator.""" - - _connection_str: str - - def __init__(self, system: System): - pulsar_host = system.settings.require("pulsar_broker_url") - pulsar_port = system.settings.require("pulsar_admin_port") - self._connection_str = f"http://{pulsar_host}:{pulsar_port}" - - # Create the default tenant and namespace - # This is a temporary workaround until we have a proper tenant/namespace management system - self.create_tenant("default") - self.create_namespace("default", "default") - - def create_tenant(self, tenant: str) -> None: - """Make a PUT request to the admin api to create the tenant""" - - path = f"/admin/v2/tenants/{tenant}" - url = self._connection_str + path - response = requests.put( - url, json={"allowedClusters": ["standalone"], "adminRoles": []} - ) # TODO: how to manage clusters? - - if response.status_code != 204 and response.status_code != 409: - raise RuntimeError(f"Failed to create tenant {tenant}") - - def create_namespace(self, tenant: str, namespace: str) -> None: - """Make a PUT request to the admin api to create the namespace""" - - path = f"/admin/v2/namespaces/{tenant}/{namespace}" - url = self._connection_str + path - response = requests.put(url) - - if response.status_code != 204 and response.status_code != 409: - raise RuntimeError(f"Failed to create namespace {namespace}") - - def create_topic(self, topic: str) -> None: - # TODO: support non-persistent topics? - tenant, namespace, topic_name = parse_topic_name(topic) - - if tenant != "default": - raise ValueError(f"Only the default tenant is supported, got {tenant}") - if namespace != "default": - raise ValueError( - f"Only the default namespace is supported, got {namespace}" - ) - - # Make a PUT request to the admin api to create the topic - path = f"/admin/v2/persistent/{tenant}/{namespace}/{topic_name}" - url = self._connection_str + path - response = requests.put(url) - - if response.status_code != 204 and response.status_code != 409: - raise RuntimeError(f"Failed to create topic {topic_name}") - - def delete_topic(self, topic: str) -> None: - tenant, namespace, topic_name = parse_topic_name(topic) - - if tenant != "default": - raise ValueError(f"Only the default tenant is supported, got {tenant}") - if namespace != "default": - raise ValueError( - f"Only the default namespace is supported, got {namespace}" - ) - - # Make a PUT request to the admin api to delete the topic - path = f"/admin/v2/persistent/{tenant}/{namespace}/{topic_name}" - # Force delete the topic - path += "?force=true" - url = self._connection_str + path - response = requests.delete(url) - if response.status_code != 204 and response.status_code != 409: - raise RuntimeError(f"Failed to delete topic {topic_name}") diff --git a/chromadb/ingest/impl/utils.py b/chromadb/ingest/impl/utils.py index 144384d75db..34b46d3899a 100644 --- a/chromadb/ingest/impl/utils.py +++ b/chromadb/ingest/impl/utils.py @@ -12,9 +12,5 @@ def parse_topic_name(topic_name: str) -> Tuple[str, str, str]: return match.group("tenant"), match.group("namespace"), match.group("topic") -def create_pulsar_connection_str(host: str, port: str) -> str: - return f"pulsar://{host}:{port}" - - def create_topic_name(tenant: str, namespace: str, topic: str) -> str: return f"persistent://{tenant}/{namespace}/{topic}" diff --git a/chromadb/segment/impl/distributed/server.py b/chromadb/segment/impl/distributed/server.py index 32bd1f67cfd..7b08e1e5d66 100644 --- a/chromadb/segment/impl/distributed/server.py +++ b/chromadb/segment/impl/distributed/server.py @@ -16,7 +16,6 @@ from chromadb.types import EmbeddingRecord from chromadb.segment.distributed import MemberlistProvider, Memberlist from chromadb.utils.rendezvous_hash import assign, murmur3hasher -from chromadb.ingest.impl.pulsar_admin import PulsarAdmin import logging import os @@ -51,7 +50,6 @@ def __init__(self, system: System) -> None: self._memberlist_provider = system.require(MemberlistProvider) self._memberlist_provider.set_memberlist_name("query-service-memberlist") self._assignment_policy = system.require(CollectionAssignmentPolicy) - self._create_pulsar_topics() self._consumer = system.require(Consumer) # Init data @@ -113,15 +111,6 @@ def _on_message(self, embedding_records: Sequence[EmbeddingRecord]) -> None: ) return None - def _create_pulsar_topics(self) -> None: - """This creates the pulsar topics used by the system. - HACK: THIS IS COMPLETELY A HACK AND WILL BE REPLACED - BY A PROPER TOPIC MANAGEMENT SYSTEM IN THE COORDINATOR""" - topics = self._assignment_policy.get_topics() - admin = PulsarAdmin(self._system) - for topic in topics: - admin.create_topic(topic) - def QueryVectors( self, request: proto.QueryVectorsRequest, context: Any ) -> proto.QueryVectorsResponse: diff --git a/chromadb/test/db/test_system.py b/chromadb/test/db/test_system.py index e899ac0b204..e3a8a966bb0 100644 --- a/chromadb/test/db/test_system.py +++ b/chromadb/test/db/test_system.py @@ -20,8 +20,8 @@ from pytest import FixtureRequest import uuid -PULSAR_TENANT = "default" -PULSAR_NAMESPACE = "default" +TENANT = "default" +NAMESPACE = "default" # These are the sample collections that are used in the tests below. Tests can override # the fields as needed. @@ -35,7 +35,7 @@ Collection( id=uuid.UUID(int=1), name="test_collection_1", - topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/chroma_log_1", + topic=f"persistent://{TENANT}/{NAMESPACE}/chroma_log_1", metadata={"test_str": "str1", "test_int": 1, "test_float": 1.3}, dimension=128, database=DEFAULT_DATABASE, @@ -44,7 +44,7 @@ Collection( id=uuid.UUID(int=2), name="test_collection_2", - topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/chroma_log_14", + topic=f"persistent://{TENANT}/{NAMESPACE}/chroma_log_14", metadata={"test_str": "str2", "test_int": 2, "test_float": 2.3}, dimension=None, database=DEFAULT_DATABASE, @@ -53,7 +53,7 @@ Collection( id=uuid.UUID(int=3), name="test_collection_3", - topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/chroma_log_14", + topic=f"persistent://{TENANT}/{NAMESPACE}/chroma_log_14", metadata={"test_str": "str3", "test_int": 3, "test_float": 3.3}, dimension=None, database=DEFAULT_DATABASE, diff --git a/chromadb/test/ingest/test_producer_consumer.py b/chromadb/test/ingest/test_producer_consumer.py index 199afde60de..31450cb7dfe 100644 --- a/chromadb/test/ingest/test_producer_consumer.py +++ b/chromadb/test/ingest/test_producer_consumer.py @@ -54,29 +54,11 @@ def sqlite_persistent() -> Generator[Tuple[Producer, Consumer], None, None]: shutil.rmtree(save_path) -def pulsar() -> Generator[Tuple[Producer, Consumer], None, None]: - """Fixture generator for pulsar Producer + Consumer. This fixture requires a running - pulsar cluster. You can use bin/cluster-test.sh to start a standalone pulsar and run this test. - Assumes pulsar_broker_url etc is set from the environment variables like PULSAR_BROKER_URL. - """ - system = System( - Settings( - allow_reset=True, - chroma_producer_impl="chromadb.ingest.impl.pulsar.PulsarProducer", - chroma_consumer_impl="chromadb.ingest.impl.pulsar.PulsarConsumer", - ) - ) - producer = system.require(Producer) - consumer = system.require(Consumer) - system.start() - yield producer, consumer - system.stop() - - def fixtures() -> List[Callable[[], Generator[Tuple[Producer, Consumer], None, None]]]: fixtures = [sqlite, sqlite_persistent] if "CHROMA_CLUSTER_TEST_ONLY" in os.environ: - fixtures = [pulsar] + # TODO: We should add the new log service here + fixtures = [] return fixtures diff --git a/chromadb/test/utils/test_messagid.py b/chromadb/test/utils/test_messagid.py index eff20a1b6fe..64d80e9b6b0 100644 --- a/chromadb/test/utils/test_messagid.py +++ b/chromadb/test/utils/test_messagid.py @@ -1,93 +1,19 @@ import chromadb.utils.messageid as mid -import pulsar import hypothesis.strategies as st -from hypothesis import given, settings, note -from typing import Any, Tuple +from hypothesis import given, settings @st.composite -def message_id(draw: st.DrawFn) -> pulsar.MessageId: - ledger_id = draw(st.integers(min_value=0, max_value=2**63 - 1)) - entry_id = draw(st.integers(min_value=0, max_value=2**63 - 1)) - batch_index = draw(st.integers(min_value=(2**31 - 1) * -1, max_value=2**31 - 1)) - partition = draw(st.integers(min_value=(2**31 - 1) * -1, max_value=2**31 - 1)) - return pulsar.MessageId(partition, ledger_id, entry_id, batch_index) +def message_id(draw: st.DrawFn) -> int: + offset_id = draw(st.integers(min_value=0, max_value=2**63 - 1)) + return offset_id @given(message_id=message_id()) @settings(max_examples=10000) # these are very fast and we want good coverage -def test_roundtrip_formats(message_id: pulsar.MessageId) -> None: - int1 = mid.pulsar_to_int(message_id) - - # Roundtrip int->string and back - str1 = mid.int_to_str(int1) - assert int1 == mid.str_to_int(str1) +def test_roundtrip_formats(message_id: int) -> None: + int1 = message_id # Roundtrip int->bytes and back b1 = mid.int_to_bytes(int1) assert int1 == mid.bytes_to_int(b1) - - # Roundtrip int -> MessageId and back - message_id_result = mid.int_to_pulsar(int1) - assert message_id_result.partition() == message_id.partition() - assert message_id_result.ledger_id() == message_id.ledger_id() - assert message_id_result.entry_id() == message_id.entry_id() - assert message_id_result.batch_index() == message_id.batch_index() - - -def assert_compare(pair1: Tuple[Any, Any], pair2: Tuple[Any, Any]) -> None: - """Helper function: assert that the two pairs of values always compare in the same - way across all comparisons and orderings.""" - - a, b = pair1 - c, d = pair2 - - try: - assert (a > b) == (c > d) - assert (a >= b) == (c >= d) - assert (a < b) == (c < d) - assert (a <= b) == (c <= d) - assert (a == b) == (c == d) - except AssertionError: - note(f"Failed to compare {a} and {b} with {c} and {d}") - note(f"type: {type(a)}") - raise - - -@given(m1=message_id(), m2=message_id()) -@settings(max_examples=10000) # these are very fast and we want good coverage -def test_messageid_comparison(m1: pulsar.MessageId, m2: pulsar.MessageId) -> None: - # MessageID comparison is broken in the Pulsar Python & CPP libraries: - # The partition field is not taken into account, and two MessageIDs with different - # partitions will compare inconsistently (m1 > m2 AND m2 > m1) - # To avoid this, we zero-out the partition field before testing. - m1 = pulsar.MessageId(0, m1.ledger_id(), m1.entry_id(), m1.batch_index()) - m2 = pulsar.MessageId(0, m2.ledger_id(), m2.entry_id(), m2.batch_index()) - - i1 = mid.pulsar_to_int(m1) - i2 = mid.pulsar_to_int(m2) - - # In python, MessageId objects are not comparable directory, but the - # internal generated native object is. - internal1 = m1._msg_id - internal2 = m2._msg_id - - s1 = mid.int_to_str(i1) - s2 = mid.int_to_str(i2) - - # assert that all strings, all ints, and all native objects compare the same - assert_compare((internal1, internal2), (i1, i2)) - assert_compare((internal1, internal2), (s1, s2)) - - -def test_max_values() -> None: - pulsar.MessageId(2**31 - 1, 2**63 - 1, 2**63 - 1, 2**31 - 1) - - -@given( - i1=st.integers(min_value=0, max_value=2**192 - 1), - i2=st.integers(min_value=0, max_value=2**192 - 1), -) -@settings(max_examples=10000) # these are very fast and we want good coverage -def test_string_comparison(i1: int, i2: int) -> None: - assert_compare((i1, i2), (mid.int_to_str(i1), mid.int_to_str(i2))) diff --git a/chromadb/types.py b/chromadb/types.py index fd66f12af6c..96597e18033 100644 --- a/chromadb/types.py +++ b/chromadb/types.py @@ -57,9 +57,9 @@ class Segment(TypedDict): # SeqID can be one of three types of value in our current and future plans: -# 1. A Pulsar MessageID encoded as a 192-bit integer -# 2. A Pulsar MessageIndex (a 64-bit integer) -# 3. A SQL RowID (a 64-bit integer) +# 1. A Pulsar MessageID encoded as a 192-bit integer - This is no longer used as we removed pulsar +# 2. A Pulsar MessageIndex (a 64-bit integer) - This is no longer used as we removed pulsar +# 3. A SQL RowID (a 64-bit integer) - This is used by both sqlite and the new log-service # All three of these types can be expressed as a Python int, so that is the type we # use in the internal Python API. However, care should be taken that the larger 192-bit diff --git a/chromadb/utils/messageid.py b/chromadb/utils/messageid.py index 9501f36c759..2583a7b420c 100644 --- a/chromadb/utils/messageid.py +++ b/chromadb/utils/messageid.py @@ -1,36 +1,3 @@ -import pulsar - - -def pulsar_to_int(message_id: pulsar.MessageId) -> int: - ledger_id: int = message_id.ledger_id() - entry_id: int = message_id.entry_id() - batch_index: int = message_id.batch_index() - partition: int = message_id.partition() - - # Convert to offset binary encoding to preserve ordering semantics when encoded - # see https://en.wikipedia.org/wiki/Offset_binary - ledger_id = ledger_id + 2**63 - entry_id = entry_id + 2**63 - batch_index = batch_index + 2**31 - partition = partition + 2**31 - - return ledger_id << 128 | entry_id << 64 | batch_index << 32 | partition - - -def int_to_pulsar(message_id: int) -> pulsar.MessageId: - partition = message_id & 0xFFFFFFFF - batch_index = message_id >> 32 & 0xFFFFFFFF - entry_id = message_id >> 64 & 0xFFFFFFFFFFFFFFFF - ledger_id = message_id >> 128 & 0xFFFFFFFFFFFFFFFF - - partition = partition - 2**31 - batch_index = batch_index - 2**31 - entry_id = entry_id - 2**63 - ledger_id = ledger_id - 2**63 - - return pulsar.MessageId(partition, ledger_id, entry_id, batch_index) - - def int_to_bytes(int: int) -> bytes: """Convert int to a 24 byte big endian byte string""" return int.to_bytes(24, "big") @@ -39,42 +6,3 @@ def int_to_bytes(int: int) -> bytes: def bytes_to_int(bytes: bytes) -> int: """Convert a 24 byte big endian byte string to an int""" return int.from_bytes(bytes, "big") - - -# Sorted in lexographic order -base85 = ( - "!#$%&()*+-0123456789;<=>?@ABCDEFGHIJKLMNOP" - + "QRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz{|}~" -) - - -# not the most efficient way to do this, see benchmark function below -def _int_to_str(n: int) -> str: - if n < 85: - return base85[n] - else: - return _int_to_str(n // 85) + base85[n % 85] - - -def int_to_str(n: int) -> str: - return _int_to_str(n).rjust(36, "!") # left pad with '!' to 36 chars - - -def str_to_int(s: str) -> int: - return sum(base85.index(c) * 85**i for i, c in enumerate(s[::-1])) - - -# 1m in 5 seconds on a M1 Pro -# Not fast, but not likely to be a bottleneck either -def _benchmark() -> None: - import random - import time - - t0 = time.time() - for i in range(1000000): - x = random.randint(0, 2**192 - 1) - s = int_to_str(x) - if s == "!": # prevent compiler from optimizing out - print("oops") - t1 = time.time() - print(t1 - t0) diff --git a/pyproject.toml b/pyproject.toml index d425e77952d..8e5c29527e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,6 @@ dependencies = [ 'numpy >= 1.22.5', 'posthog >= 2.4.0', 'typing_extensions >= 4.5.0', - 'pulsar-client>=3.1.0', 'onnxruntime >= 1.14.1', 'opentelemetry-api>=1.2.0', 'opentelemetry-exporter-otlp-proto-grpc>=1.2.0', diff --git a/requirements.txt b/requirements.txt index 0ed94e5033b..02e7c2a62bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,9 +12,9 @@ opentelemetry-api>=1.2.0 opentelemetry-exporter-otlp-proto-grpc>=1.2.0 opentelemetry-instrumentation-fastapi>=0.41b0 opentelemetry-sdk>=1.2.0 +orjson>=3.9.12 overrides>=7.3.1 posthog>=2.4.0 -pulsar-client>=3.1.0 pydantic>=1.9 pypika>=0.48.9 PyYAML>=6.0.0 @@ -25,4 +25,3 @@ tqdm>=4.65.0 typer>=0.9.0 typing_extensions>=4.5.0 uvicorn[standard]>=0.18.3 -orjson>=3.9.12