Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] add Rust pulsar and topic management #1528

Merged
merged 3 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,649 changes: 1,610 additions & 39 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ name = "worker"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "worker"
path = "src/bin/worker.rs"

[dependencies]
tonic = "0.10"
prost = "0.12"
Expand All @@ -18,13 +22,15 @@ serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
futures = "0.3"
num_cpus = "1.16.0"
pulsar = "6.1.0"
murmur3 = "0.5.2"
thiserror = "1.0.50"
num-bigint = "0.4.4"
tempfile = "3.8.1"
schemars = "0.8.16"
kube = { version = "0.87.1", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.20.0", features = ["latest"] }
bytes = "1.5.0"

[build-dependencies]
tonic-build = "0.10"
Expand Down
21 changes: 21 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Default configuration for Chroma worker
# In the long term, every service should have an entry in this file
# and this can become the global configuration file for Chroma
# for now we nest it in the worker directory

worker:
my_ip: "10.244.0.85"
num_indexing_threads: 4
pulsar_url: "pulsar://127.0.0.1:6650"
pulsar_tenant: "public"
pulsar_namespace: "default"
kube_namespace: "chroma"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
memberlist_provider:
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100
58 changes: 26 additions & 32 deletions rust/worker/src/assignment/assignment_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,29 @@ use super::{
rendezvous_hash::{assign, AssignmentError, Murmur3Hasher},
};
use async_trait::async_trait;
use uuid::Uuid;

/*
===========================================
Interfaces
===========================================
*/

/// AssignmentPolicy is a trait that defines how to assign a collection to a topic.
/// AssignmentPolicy is a trait that defines how to assign a key to a set of members.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generalizing this for any assignment

/// # Notes
/// This trait mirrors the go and python versions of the assignment policy
/// interface.
/// # Methods
/// - assign: Assign a collection to a topic.
/// - get_topics: Get the topics that can be assigned to.
/// - assign: Assign a key to a topic.
/// - get_members: Get the members that can be assigned to.
/// - set_members: Set the members that can be assigned to.
/// # Notes
/// An assignment policy is not responsible for creating the topics it assigns to.
/// It is the responsibility of the caller to ensure that the topics exist.
/// An assignment policy must be Send.
pub(crate) trait AssignmentPolicy: Send {
fn assign(&self, collection_id: Uuid) -> Result<String, AssignmentError>;
fn get_topics(&self) -> Vec<String>;
fn assign(&self, key: &str) -> Result<String, AssignmentError>;
fn get_members(&self) -> Vec<String>;
fn set_members(&mut self, members: Vec<String>);
}

/*
Expand All @@ -39,13 +40,8 @@ Implementation
*/

pub(crate) struct RendezvousHashingAssignmentPolicy {
// The pulsar tenant and namespace being in this implementation of the assignment policy
// is purely a temporary measure while the topic propagation is being worked on.
// TODO: Remove pulsar_tenant and pulsar_namespace from this struct once topic propagation
// is implemented.
pulsar_tenant: String,
pulsar_namespace: String,
hasher: Murmur3Hasher,
members: Vec<String>,
}

impl RendezvousHashingAssignmentPolicy {
Expand All @@ -55,16 +51,19 @@ impl RendezvousHashingAssignmentPolicy {
// take ownership of them and put the responsibility on the caller to clone them if they
// need to. This is the general pattern we should follow in rust - put the burden of cloning
// on the caller, and if they don't need to clone, they can pass ownership.
pub fn new(
pub(crate) fn new(
pulsar_tenant: String,
pulsar_namespace: String,
) -> RendezvousHashingAssignmentPolicy {
return RendezvousHashingAssignmentPolicy {
pulsar_tenant: pulsar_tenant,
pulsar_namespace: pulsar_namespace,
hasher: Murmur3Hasher {},
members: vec![],
};
}

pub(crate) fn set_members(&mut self, members: Vec<String>) {
self.members = members;
}
}

#[async_trait]
Expand All @@ -77,31 +76,26 @@ impl Configurable for RendezvousHashingAssignmentPolicy {
HasherType::Murmur3 => Murmur3Hasher {},
};
return Ok(RendezvousHashingAssignmentPolicy {
pulsar_tenant: worker_config.pulsar_tenant.clone(),
pulsar_namespace: worker_config.pulsar_namespace.clone(),
hasher: hasher,
members: vec![],
});
}
}

impl AssignmentPolicy for RendezvousHashingAssignmentPolicy {
fn assign(&self, collection_id: Uuid) -> Result<String, AssignmentError> {
let collection_id = collection_id.to_string();
let topics = self.get_topics();
let topic = assign(&collection_id, topics, &self.hasher);
fn assign(&self, key: &str) -> Result<String, AssignmentError> {
let topics = self.get_members();
let topic = assign(key, topics, &self.hasher);
return topic;
}

fn get_topics(&self) -> Vec<String> {
// This mirrors the current python and go code, which assumes a fixed set of topics
let mut topics = Vec::with_capacity(16);
for i in 0..16 {
let topic = format!(
"persistent://{}/{}/chroma_log_{}",
self.pulsar_tenant, self.pulsar_namespace, i
);
topics.push(topic);
}
return topics;
fn get_members(&self) -> Vec<String> {
// This is not designed to be used frequently for now, nor is the number of members
// expected to be large, so we can just clone the members
return self.members.clone();
}

fn set_members(&mut self, members: Vec<String>) {
self.members = members;
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/assignment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod assignment_policy;
pub(crate) mod assignment_policy;
pub(crate) mod config;
mod rendezvous_hash;
6 changes: 6 additions & 0 deletions rust/worker/src/bin/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use worker::worker_entrypoint;

#[tokio::main]
async fn main() {
worker_entrypoint().await;
}
22 changes: 18 additions & 4 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ const ENV_PREFIX: &str = "CHROMA_";
/// variables take precedence over values in the YAML file.
/// By default, it is read from the current working directory,
/// with the filename chroma_config.yaml.
struct RootConfig {
pub(crate) struct RootConfig {
// The root config object wraps the worker config object so that
// we can share the same config file between multiple services.
worker: WorkerConfig,
pub worker: WorkerConfig,
}

impl RootConfig {
Expand All @@ -37,7 +37,7 @@ impl RootConfig {
/// The default location is the current working directory, with the filename chroma_config.yaml.
/// The environment variables are prefixed with CHROMA_ and are uppercase.
/// Values in the envionment variables take precedence over values in the YAML file.
pub fn load() -> Self {
pub(crate) fn load() -> Self {
return Self::load_from_path(DEFAULT_CONFIG_PATH);
}

Expand All @@ -56,7 +56,7 @@ impl RootConfig {
/// # Notes
/// The environment variables are prefixed with CHROMA_ and are uppercase.
/// Values in the envionment variables take precedence over values in the YAML file.
pub fn load_from_path(path: &str) -> Self {
pub(crate) fn load_from_path(path: &str) -> Self {
// Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them.
// Excluding our own environment variables, which are prefixed with CHROMA_.
let mut f = figment::Figment::from(Env::prefixed("CHROMA_").map(|k| match k {
Expand Down Expand Up @@ -100,9 +100,11 @@ pub(crate) struct WorkerConfig {
pub(crate) num_indexing_threads: u32,
pub(crate) pulsar_tenant: String,
pub(crate) pulsar_namespace: String,
pub(crate) pulsar_url: String,
pub(crate) kube_namespace: String,
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig,
pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig,
pub(crate) ingest: crate::ingest::config::IngestConfig,
}

/// # Description
Expand Down Expand Up @@ -133,6 +135,7 @@ mod tests {
num_indexing_threads: 4
pulsar_tenant: "public"
pulsar_namespace: "default"
pulsar_url: "pulsar://localhost:6650"
kube_namespace: "chroma"
assignment_policy:
RendezvousHashing:
Expand All @@ -141,6 +144,8 @@ mod tests {
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100
"#,
);
let config = RootConfig::load();
Expand All @@ -164,6 +169,7 @@ mod tests {
num_indexing_threads: 4
pulsar_tenant: "public"
pulsar_namespace: "default"
pulsar_url: "pulsar://localhost:6650"
kube_namespace: "chroma"
assignment_policy:
RendezvousHashing:
Expand All @@ -172,6 +178,8 @@ mod tests {
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100

"#,
);
Expand Down Expand Up @@ -212,13 +220,16 @@ mod tests {
pulsar_tenant: "public"
pulsar_namespace: "default"
kube_namespace: "chroma"
pulsar_url: "pulsar://localhost:6650"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
memberlist_provider:
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100

"#,
);
Expand All @@ -236,6 +247,7 @@ mod tests {
let _ = jail.set_env("CHROMA_WORKER__PULSAR_TENANT", "A");
let _ = jail.set_env("CHROMA_WORKER__PULSAR_NAMESPACE", "B");
let _ = jail.set_env("CHROMA_WORKER__KUBE_NAMESPACE", "C");
let _ = jail.set_env("CHROMA_WORKER__PULSAR_URL", "pulsar://localhost:6650");
let _ = jail.create_file(
"chroma_config.yaml",
r#"
Expand All @@ -247,6 +259,8 @@ mod tests {
CustomResource:
memberlist_name: "worker-memberlist"
queue_size: 100
ingest:
queue_size: 100
"#,
);
let config = RootConfig::load();
Expand Down
6 changes: 6 additions & 0 deletions rust/worker/src/ingest/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use serde::Deserialize;

#[derive(Deserialize)]
pub(crate) struct IngestConfig {
pub(crate) queue_size: usize,
}
Loading