Skip to content

Commit

Permalink
[ENH] add rust pulsar and topic management
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Dec 16, 2023
1 parent 98a83d4 commit 5cdba43
Show file tree
Hide file tree
Showing 16 changed files with 2,108 additions and 84 deletions.
1,649 changes: 1,610 additions & 39 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ name = "worker"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "worker"
path = "src/bin/worker.rs"

[dependencies]
tonic = "0.10"
prost = "0.12"
Expand All @@ -18,13 +22,15 @@ serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
futures = "0.3"
num_cpus = "1.16.0"
pulsar = "6.1.0"
murmur3 = "0.5.2"
thiserror = "1.0.50"
num-bigint = "0.4.4"
tempfile = "3.8.1"
schemars = "0.8.16"
kube = { version = "0.87.1", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.20.0", features = ["latest"] }
bytes = "1.5.0"

[build-dependencies]
tonic-build = "0.10"
Expand Down
21 changes: 21 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Default configuration for Chroma worker
# In the long term, every service should have an entry in this file
# and this can become the global configuration file for Chroma
# for now we nest it in the worker directory

worker:
my_ip: "10.244.0.85"
num_indexing_threads: 4
pulsar_url: "pulsar://127.0.0.1:6650"
pulsar_tenant: "public"
pulsar_namespace: "default"
kube_namespace: "chroma"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
memberlist_provider:
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100
58 changes: 26 additions & 32 deletions rust/worker/src/assignment/assignment_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,29 @@ use super::{
rendezvous_hash::{assign, AssignmentError, Murmur3Hasher},
};
use async_trait::async_trait;
use uuid::Uuid;

/*
===========================================
Interfaces
===========================================
*/

/// AssignmentPolicy is a trait that defines how to assign a collection to a topic.
/// AssignmentPolicy is a trait that defines how to assign a key to a set of members.
/// # Notes
/// This trait mirrors the go and python versions of the assignment policy
/// interface.
/// # Methods
/// - assign: Assign a collection to a topic.
/// - get_topics: Get the topics that can be assigned to.
/// - assign: Assign a key to a topic.
/// - get_members: Get the members that can be assigned to.
/// - set_members: Set the members that can be assigned to.
/// # Notes
/// An assignment policy is not responsible for creating the topics it assigns to.
/// It is the responsibility of the caller to ensure that the topics exist.
/// An assignment policy must be Send.
pub(crate) trait AssignmentPolicy: Send {
fn assign(&self, collection_id: Uuid) -> Result<String, AssignmentError>;
fn get_topics(&self) -> Vec<String>;
fn assign(&self, key: &str) -> Result<String, AssignmentError>;
fn get_members(&self) -> Vec<String>;
fn set_members(&mut self, members: Vec<String>);
}

/*
Expand All @@ -39,13 +40,8 @@ Implementation
*/

pub(crate) struct RendezvousHashingAssignmentPolicy {
// The pulsar tenant and namespace being in this implementation of the assignment policy
// is purely a temporary measure while the topic propagation is being worked on.
// TODO: Remove pulsar_tenant and pulsar_namespace from this struct once topic propagation
// is implemented.
pulsar_tenant: String,
pulsar_namespace: String,
hasher: Murmur3Hasher,
members: Vec<String>,
}

impl RendezvousHashingAssignmentPolicy {
Expand All @@ -55,16 +51,19 @@ impl RendezvousHashingAssignmentPolicy {
// take ownership of them and put the responsibility on the caller to clone them if they
// need to. This is the general pattern we should follow in rust - put the burden of cloning
// on the caller, and if they don't need to clone, they can pass ownership.
pub fn new(
pub(crate) fn new(
pulsar_tenant: String,
pulsar_namespace: String,
) -> RendezvousHashingAssignmentPolicy {
return RendezvousHashingAssignmentPolicy {
pulsar_tenant: pulsar_tenant,
pulsar_namespace: pulsar_namespace,
hasher: Murmur3Hasher {},
members: vec![],
};
}

pub(crate) fn set_members(&mut self, members: Vec<String>) {
self.members = members;
}
}

#[async_trait]
Expand All @@ -77,31 +76,26 @@ impl Configurable for RendezvousHashingAssignmentPolicy {
HasherType::Murmur3 => Murmur3Hasher {},
};
return Ok(RendezvousHashingAssignmentPolicy {
pulsar_tenant: worker_config.pulsar_tenant.clone(),
pulsar_namespace: worker_config.pulsar_namespace.clone(),
hasher: hasher,
members: vec![],
});
}
}

impl AssignmentPolicy for RendezvousHashingAssignmentPolicy {
fn assign(&self, collection_id: Uuid) -> Result<String, AssignmentError> {
let collection_id = collection_id.to_string();
let topics = self.get_topics();
let topic = assign(&collection_id, topics, &self.hasher);
fn assign(&self, key: &str) -> Result<String, AssignmentError> {
let topics = self.get_members();
let topic = assign(key, topics, &self.hasher);
return topic;
}

fn get_topics(&self) -> Vec<String> {
// This mirrors the current python and go code, which assumes a fixed set of topics
let mut topics = Vec::with_capacity(16);
for i in 0..16 {
let topic = format!(
"persistent://{}/{}/chroma_log_{}",
self.pulsar_tenant, self.pulsar_namespace, i
);
topics.push(topic);
}
return topics;
fn get_members(&self) -> Vec<String> {
// This is not designed to be used frequently for now, nor is the number of members
// expected to be large, so we can just clone the members
return self.members.clone();
}

fn set_members(&mut self, members: Vec<String>) {
self.members = members;
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/assignment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod assignment_policy;
pub(crate) mod assignment_policy;
pub(crate) mod config;
mod rendezvous_hash;
6 changes: 6 additions & 0 deletions rust/worker/src/bin/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use worker::worker_entrypoint;

#[tokio::main]
async fn main() {
worker_entrypoint().await;
}
22 changes: 18 additions & 4 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ const ENV_PREFIX: &str = "CHROMA_";
/// variables take precedence over values in the YAML file.
/// By default, it is read from the current working directory,
/// with the filename chroma_config.yaml.
struct RootConfig {
pub(crate) struct RootConfig {
// The root config object wraps the worker config object so that
// we can share the same config file between multiple services.
worker: WorkerConfig,
pub worker: WorkerConfig,
}

impl RootConfig {
Expand All @@ -37,7 +37,7 @@ impl RootConfig {
/// The default location is the current working directory, with the filename chroma_config.yaml.
/// The environment variables are prefixed with CHROMA_ and are uppercase.
/// Values in the envionment variables take precedence over values in the YAML file.
pub fn load() -> Self {
pub(crate) fn load() -> Self {
return Self::load_from_path(DEFAULT_CONFIG_PATH);
}

Expand All @@ -56,7 +56,7 @@ impl RootConfig {
/// # Notes
/// The environment variables are prefixed with CHROMA_ and are uppercase.
/// Values in the envionment variables take precedence over values in the YAML file.
pub fn load_from_path(path: &str) -> Self {
pub(crate) fn load_from_path(path: &str) -> Self {
// Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them.
// Excluding our own environment variables, which are prefixed with CHROMA_.
let mut f = figment::Figment::from(Env::prefixed("CHROMA_").map(|k| match k {
Expand Down Expand Up @@ -100,9 +100,11 @@ pub(crate) struct WorkerConfig {
pub(crate) num_indexing_threads: u32,
pub(crate) pulsar_tenant: String,
pub(crate) pulsar_namespace: String,
pub(crate) pulsar_url: String,
pub(crate) kube_namespace: String,
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig,
pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig,
pub(crate) ingest: crate::ingest::config::IngestConfig,
}

/// # Description
Expand Down Expand Up @@ -133,6 +135,7 @@ mod tests {
num_indexing_threads: 4
pulsar_tenant: "public"
pulsar_namespace: "default"
pulsar_url: "pulsar://localhost:6650"
kube_namespace: "chroma"
assignment_policy:
RendezvousHashing:
Expand All @@ -141,6 +144,8 @@ mod tests {
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100
"#,
);
let config = RootConfig::load();
Expand All @@ -164,6 +169,7 @@ mod tests {
num_indexing_threads: 4
pulsar_tenant: "public"
pulsar_namespace: "default"
pulsar_url: "pulsar://localhost:6650"
kube_namespace: "chroma"
assignment_policy:
RendezvousHashing:
Expand All @@ -172,6 +178,8 @@ mod tests {
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100
"#,
);
Expand Down Expand Up @@ -212,13 +220,16 @@ mod tests {
pulsar_tenant: "public"
pulsar_namespace: "default"
kube_namespace: "chroma"
pulsar_url: "pulsar://localhost:6650"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
memberlist_provider:
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100
"#,
);
Expand All @@ -236,6 +247,7 @@ mod tests {
let _ = jail.set_env("CHROMA_WORKER__PULSAR_TENANT", "A");
let _ = jail.set_env("CHROMA_WORKER__PULSAR_NAMESPACE", "B");
let _ = jail.set_env("CHROMA_WORKER__KUBE_NAMESPACE", "C");
let _ = jail.set_env("CHROMA_WORKER__PULSAR_URL", "pulsar://localhost:6650");
let _ = jail.create_file(
"chroma_config.yaml",
r#"
Expand All @@ -247,6 +259,8 @@ mod tests {
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100
"#,
);
let config = RootConfig::load();
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ pub(crate) enum ErrorCodes {
DataLoss = 15,
}

pub(crate) trait ChromaError: Error {
pub trait ChromaError: Error {
fn code(&self) -> ErrorCodes;
}
6 changes: 6 additions & 0 deletions rust/worker/src/ingest/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use serde::Deserialize;

#[derive(Deserialize)]
pub(crate) struct IngestConfig {
pub(crate) queue_size: usize,
}
Loading

0 comments on commit 5cdba43

Please sign in to comment.