Skip to content

Commit

Permalink
rename Telemetry to ProductTelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
beggers committed Oct 9, 2023
1 parent e81cc9f commit bb3e297
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 159 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ index_data
# Default configuration for persist_directory in chromadb/config.py
# Currently it's located in "./chroma/"
chroma/
chroma_test_data/

.venv
venv
.env
.chroma
Expand Down
15 changes: 10 additions & 5 deletions chromadb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
"QueryResult",
"GetResult",
]
from chromadb.telemetry.events import ClientStartEvent
from chromadb.telemetry import Telemetry
from chromadb.telemetry.product.events import ClientStartEvent
from chromadb.telemetry.product import ProductTelemetryClient


logger = logging.getLogger(__name__)
Expand All @@ -56,12 +56,14 @@
is_client = False
try:
from chromadb.is_thin_client import is_thin_client # type: ignore

is_client = is_thin_client
except ImportError:
is_client = False

if not is_client:
import sqlite3

if sqlite3.sqlite_version_info < (3, 35, 0):
if IN_COLAB:
# In Colab, hotswap to pysqlite-binary if it's too old
Expand All @@ -75,8 +77,11 @@
sys.modules["sqlite3"] = sys.modules.pop("pysqlite3")
else:
raise RuntimeError(
"\033[91mYour system has an unsupported version of sqlite3. Chroma requires sqlite3 >= 3.35.0.\033[0m\n"
"\033[94mPlease visit https://docs.trychroma.com/troubleshooting#sqlite to learn how to upgrade.\033[0m"
"\033[91mYour system has an unsupported version of sqlite3. Chroma \
requires sqlite3 >= 3.35.0.\033[0m\n"
"\033[94mPlease visit \
https://docs.trychroma.com/troubleshooting#sqlite to learn how \
to upgrade.\033[0m"
)


Expand Down Expand Up @@ -147,7 +152,7 @@ def Client(settings: Settings = __settings) -> API:

system = System(settings)

telemetry_client = system.instance(Telemetry)
telemetry_client = system.instance(ProductTelemetryClient)
api = system.instance(API)

system.start()
Expand Down
7 changes: 4 additions & 3 deletions chromadb/api/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from chromadb.auth.providers import RequestsClientAuthProtocolAdapter
from chromadb.auth.registry import resolve_provider
from chromadb.config import Settings, System
from chromadb.telemetry import Telemetry
from chromadb.telemetry.product import ProductTelemetryClient
from urllib.parse import urlparse, urlunparse, quote

logger = logging.getLogger(__name__)
Expand All @@ -51,7 +51,8 @@ def _validate_host(host: str) -> None:
if "/" in host and (not host.startswith("http")):
raise ValueError(
"Invalid URL. "
"Seems that you are trying to pass URL as a host but without specifying the protocol. "
"Seems that you are trying to pass URL as a host but without \
specifying the protocol. "
"Please add http:// or https:// to the host."
)

Expand Down Expand Up @@ -92,7 +93,7 @@ def __init__(self, system: System):
system.settings.require("chroma_server_host")
system.settings.require("chroma_server_http_port")

self._telemetry_client = self.require(Telemetry)
self._product_telemetry_client = self.require(ProductTelemetryClient)
self._settings = system.settings

self._api_url = FastAPI.resolve_url(
Expand Down
20 changes: 10 additions & 10 deletions chromadb/api/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from chromadb.db.system import SysDB
from chromadb.ingest.impl.utils import create_topic_name
from chromadb.segment import SegmentManager, MetadataReader, VectorReader
from chromadb.telemetry import Telemetry
from chromadb.telemetry.product import ProductTelemetryClient
from chromadb.ingest import Producer
from chromadb.api.models.Collection import Collection
from chromadb import __version__
Expand All @@ -29,7 +29,7 @@
validate_where_document,
validate_batch,
)
from chromadb.telemetry.events import (
from chromadb.telemetry.product.events import (
CollectionAddEvent,
CollectionDeleteEvent,
CollectionGetEvent,
Expand Down Expand Up @@ -80,7 +80,7 @@ class SegmentAPI(API):
_manager: SegmentManager
_producer: Producer
# TODO: fire telemetry events
_telemetry_client: Telemetry
_product_telemetry_client: ProductTelemetryClient
_tenant_id: str
_topic_ns: str
_collection_cache: Dict[UUID, t.Collection]
Expand All @@ -90,7 +90,7 @@ def __init__(self, system: System):
self._settings = system.settings
self._sysdb = self.require(SysDB)
self._manager = self.require(SegmentManager)
self._telemetry_client = self.require(Telemetry)
self._product_telemetry_client = self.require(ProductTelemetryClient)
self._producer = self.require(Producer)
self._tenant_id = system.settings.tenant_id
self._topic_ns = system.settings.topic_namespace
Expand Down Expand Up @@ -147,7 +147,7 @@ def create_collection(
for segment in segments:
self._sysdb.create_segment(segment)

self._telemetry_client.capture(
self._product_telemetry_client.capture(
ClientCreateCollectionEvent(
collection_uuid=str(id),
embedding_function=embedding_function.__class__.__name__,
Expand Down Expand Up @@ -277,7 +277,7 @@ def _add(
records_to_submit.append(r)
self._producer.submit_embeddings(coll["topic"], records_to_submit)

self._telemetry_client.capture(
self._product_telemetry_client.capture(
CollectionAddEvent(
collection_uuid=str(collection_id),
add_amount=len(ids),
Expand Down Expand Up @@ -314,7 +314,7 @@ def _update(
records_to_submit.append(r)
self._producer.submit_embeddings(coll["topic"], records_to_submit)

self._telemetry_client.capture(
self._product_telemetry_client.capture(
CollectionUpdateEvent(
collection_uuid=str(collection_id),
update_amount=len(ids),
Expand Down Expand Up @@ -408,7 +408,7 @@ def _get(
if "documents" in include:
documents = [_doc(m) for m in metadatas]

self._telemetry_client.capture(
self._product_telemetry_client.capture(
CollectionGetEvent(
collection_uuid=str(collection_id),
ids_count=len(ids) if ids else 0,
Expand Down Expand Up @@ -481,7 +481,7 @@ def _delete(
records_to_submit.append(r)
self._producer.submit_embeddings(coll["topic"], records_to_submit)

self._telemetry_client.capture(
self._product_telemetry_client.capture(
CollectionDeleteEvent(
collection_uuid=str(collection_id), delete_amount=len(ids_to_delete)
)
Expand Down Expand Up @@ -571,7 +571,7 @@ def _query(
doc_list = [_doc(m) for m in metadata_list]
documents.append(doc_list) # type: ignore

self._telemetry_client.capture(
self._product_telemetry_client.capture(
CollectionQueryEvent(
collection_uuid=str(collection_id),
query_amount=len(query_embeddings),
Expand Down
4 changes: 2 additions & 2 deletions chromadb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
# TODO: Don't use concrete types here to avoid circular deps. Strings are fine for right here!
_abstract_type_keys: Dict[str, str] = {
"chromadb.api.API": "chroma_api_impl",
"chromadb.telemetry.Telemetry": "chroma_telemetry_impl",
"chromadb.telemetry.product.ProductTelemetryClient": "chroma_product_telemetry_impl",
"chromadb.ingest.Producer": "chroma_producer_impl",
"chromadb.ingest.Consumer": "chroma_consumer_impl",
"chromadb.db.system.SysDB": "chroma_sysdb_impl",
Expand All @@ -80,7 +80,7 @@ class Settings(BaseSettings): # type: ignore
chroma_db_impl: Optional[str] = None

chroma_api_impl: str = "chromadb.api.segment.SegmentAPI" # Can be "chromadb.api.segment.SegmentAPI" or "chromadb.api.fastapi.FastAPI"
chroma_telemetry_impl: str = "chromadb.telemetry.posthog.Posthog"
chroma_product_telemetry_impl: str = "chromadb.telemetry.product.posthog.Posthog"

# New architecture components
chroma_sysdb_impl: str = "chromadb.db.impl.sqlite.SqliteDB"
Expand Down
4 changes: 2 additions & 2 deletions chromadb/server/fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from starlette.requests import Request

import logging
from chromadb.telemetry import ServerContext, Telemetry
from chromadb.telemetry.product import ServerContext, ProductTelemetryClient

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -102,7 +102,7 @@ def include_in_schema(path: str) -> bool:
class FastAPI(chromadb.server.Server):
def __init__(self, settings: Settings):
super().__init__(settings)
Telemetry.SERVER_CONTEXT = ServerContext.FASTAPI
ProductTelemetryClient.SERVER_CONTEXT = ServerContext.FASTAPI
self._app = fastapi.FastAPI(debug=True)
self._api: chromadb.api.API = chromadb.Client(settings)

Expand Down
10 changes: 10 additions & 0 deletions chromadb/telemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Telemetry

This directory holds all the telemetry for Chroma.

- `product/` contains anonymized product telemetry which we, Chroma, collect so we can
understand usage patterns. For more information, see https://docs.trychroma.com/product-telemetry.
- `opentelemetry/` contains all of the config for Chroma's [OpenTelemetry](https://opentelemetry.io/docs/instrumentation/python/getting-started/)
setup. These metrics are *not* sent back to Chroma -- anyone operating a Chroma instance
can use the OpenTelemetry metrics and traces to understand how their instance of Chroma
is behaving.
122 changes: 0 additions & 122 deletions chromadb/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,122 +0,0 @@
from abc import abstractmethod
import os
from typing import Callable, ClassVar, Dict, Any
import uuid
import time
from threading import Event, Thread
import chromadb
from chromadb.config import Component
from pathlib import Path
from enum import Enum

TELEMETRY_WHITELISTED_SETTINGS = [
"chroma_api_impl",
"is_persistent",
"chroma_server_ssl_enabled",
]


class ServerContext(Enum):
NONE = "None"
FASTAPI = "FastAPI"


class TelemetryEvent:
max_batch_size: ClassVar[int] = 1
batch_size: int

def __init__(self, batch_size: int = 1):
self.batch_size = batch_size

@property
def properties(self) -> Dict[str, Any]:
return self.__dict__

@property
def name(self) -> str:
return self.__class__.__name__

# A batch key is used to determine whether two events can be batched together.
# If a TelemetryEvent's max_batch_size > 1, batch_key() and batch() MUST be implemented.
# Otherwise they are ignored.
@property
def batch_key(self) -> str:
return self.name

def batch(self, other: "TelemetryEvent") -> "TelemetryEvent":
raise NotImplementedError


class RepeatedTelemetry:
def __init__(self, interval: int, function: Callable[[], None]):
self.interval = interval
self.function = function
self.start = time.time()
self.event = Event()
self.thread = Thread(target=self._target)
self.thread.daemon = True
self.thread.start()

def _target(self) -> None:
while not self.event.wait(self._time):
self.function()

@property
def _time(self) -> float:
return self.interval - ((time.time() - self.start) % self.interval)

def stop(self) -> None:
self.event.set()
self.thread.join()


class Telemetry(Component):
USER_ID_PATH = str(Path.home() / ".cache" / "chroma" / "telemetry_user_id")
UNKNOWN_USER_ID = "UNKNOWN"
SERVER_CONTEXT: ServerContext = ServerContext.NONE
_curr_user_id = None

@abstractmethod
def capture(self, event: TelemetryEvent) -> None:
pass

# Schedule a function that creates a TelemetryEvent to be called every `every_seconds` seconds.
def schedule_event_function(
self, event_function: Callable[..., TelemetryEvent], every_seconds: int
) -> None:
RepeatedTelemetry(every_seconds, lambda: self.capture(event_function()))

@property
def context(self) -> Dict[str, Any]:
chroma_version = chromadb.__version__
settings = chromadb.get_settings()
telemetry_settings = {}
for whitelisted in TELEMETRY_WHITELISTED_SETTINGS:
telemetry_settings[whitelisted] = settings[whitelisted]

self._context = {
"chroma_version": chroma_version,
"server_context": self.SERVER_CONTEXT.value,
**telemetry_settings,
}
return self._context

@property
def user_id(self) -> str:
if self._curr_user_id:
return self._curr_user_id

# File access may fail due to permissions or other reasons. We don't want to crash so we catch all exceptions.
try:
if not os.path.exists(self.USER_ID_PATH):
os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True)
with open(self.USER_ID_PATH, "w") as f:
new_user_id = str(uuid.uuid4())
f.write(new_user_id)
self._curr_user_id = new_user_id
else:
with open(self.USER_ID_PATH, "r") as f:
self._curr_user_id = f.read()
except Exception:
self._curr_user_id = self.UNKNOWN_USER_ID
return self._curr_user_id
Loading

0 comments on commit bb3e297

Please sign in to comment.