-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[ENH] Add rust assignmenment policy and config management
- Loading branch information
Showing
5 changed files
with
185 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
use crate::config::{Configurable, WorkerConfig}; | ||
|
||
use super::{ | ||
config::{AssignmentPolicyConfig, HasherType}, | ||
rendezvous_hash::{assign, AssignmentError, Murmur3Hasher}, | ||
}; | ||
use uuid::Uuid; | ||
|
||
/* | ||
=========================================== | ||
Interfaces | ||
=========================================== | ||
*/ | ||
|
||
/// AssignmentPolicy is a trait that defines how to assign a collection to a topic. | ||
/// # Notes | ||
/// This trait mirrors the go and python versions of the assignment policy | ||
/// interface. | ||
/// # Methods | ||
/// - assign: Assign a collection to a topic. | ||
/// - get_topics: Get the topics that can be assigned to. | ||
/// # Notes | ||
/// An assignment policy is not responsible for creating the topics it assigns to. | ||
/// It is the responsibility of the caller to ensure that the topics exist. | ||
/// An assignment policy must be Send. | ||
pub(crate) trait AssignmentPolicy: Send { | ||
fn assign(&self, collection_id: Uuid) -> Result<String, AssignmentError>; | ||
fn get_topics(&self) -> Vec<String>; | ||
} | ||
|
||
/* | ||
=========================================== | ||
Implementation | ||
=========================================== | ||
*/ | ||
|
||
pub(crate) struct RendezvousHashingAssignmentPolicy { | ||
// The pulsar tenant and namespace being in this implementation of the assignment policy | ||
// is purely a temporary measure while the topic propagation is being worked on. | ||
// TODO: Remove pulsar_tenant and pulsar_namespace from this struct once topic propagation | ||
// is implemented. | ||
pulsar_tenant: String, | ||
pulsar_namespace: String, | ||
hasher: Murmur3Hasher, | ||
} | ||
|
||
impl RendezvousHashingAssignmentPolicy { | ||
// Rust beginners note | ||
// The reason we take String and not &str is because we need to put the strings into our | ||
// struct, and we can't do that with references so rather than clone the strings, we just | ||
// take ownership of them and put the responsibility on the caller to clone them if they | ||
// need to. This is the general pattern we should follow in rust - put the burden of cloning | ||
// on the caller, and if they don't need to clone, they can pass ownership. | ||
pub fn new( | ||
pulsar_tenant: String, | ||
pulsar_namespace: String, | ||
) -> RendezvousHashingAssignmentPolicy { | ||
return RendezvousHashingAssignmentPolicy { | ||
pulsar_tenant: pulsar_tenant, | ||
pulsar_namespace: pulsar_namespace, | ||
hasher: Murmur3Hasher {}, | ||
}; | ||
} | ||
} | ||
|
||
impl Configurable for RendezvousHashingAssignmentPolicy { | ||
fn from_config(config: WorkerConfig) -> Self { | ||
let assignment_policy_config = match config.assignment_policy { | ||
AssignmentPolicyConfig::RendezvousHashing(config) => config, | ||
}; | ||
let hasher = match assignment_policy_config.hasher { | ||
HasherType::Murmur3 => Murmur3Hasher {}, | ||
}; | ||
return RendezvousHashingAssignmentPolicy { | ||
pulsar_tenant: config.pulsar_tenant, | ||
pulsar_namespace: config.pulsar_namespace, | ||
hasher: hasher, | ||
}; | ||
} | ||
} | ||
|
||
impl AssignmentPolicy for RendezvousHashingAssignmentPolicy { | ||
fn assign(&self, collection_id: Uuid) -> Result<String, AssignmentError> { | ||
let collection_id = collection_id.to_string(); | ||
let topics = self.get_topics(); | ||
let topic = assign(&collection_id, topics, &self.hasher); | ||
return topic; | ||
} | ||
|
||
fn get_topics(&self) -> Vec<String> { | ||
// This mirrors the current python and go code, which assumes a fixed set of topics | ||
let mut topics = Vec::with_capacity(16); | ||
for i in 0..16 { | ||
let topic = format!( | ||
"persistent://{}/{}/chroma_log_{}", | ||
self.pulsar_tenant, self.pulsar_namespace, i | ||
); | ||
topics.push(topic); | ||
} | ||
return topics; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
use serde::Deserialize; | ||
|
||
#[derive(Deserialize)] | ||
/// The type of hasher to use. | ||
/// # Options | ||
/// - Murmur3: The murmur3 hasher. | ||
pub(crate) enum HasherType { | ||
Murmur3, | ||
} | ||
|
||
#[derive(Deserialize)] | ||
/// The configuration for the assignment policy. | ||
/// # Options | ||
/// - RendezvousHashing: The rendezvous hashing assignment policy. | ||
/// # Notes | ||
/// See config.rs in the root of the worker crate for an example of how to use | ||
/// config files to configure the worker. | ||
pub(crate) enum AssignmentPolicyConfig { | ||
RendezvousHashing(RendezvousHashingAssignmentPolicyConfig), | ||
} | ||
|
||
#[derive(Deserialize)] | ||
/// The configuration for the rendezvous hashing assignment policy. | ||
/// # Fields | ||
/// - hasher: The type of hasher to use. | ||
pub(crate) struct RendezvousHashingAssignmentPolicyConfig { | ||
pub(crate) hasher: HasherType, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,3 @@ | ||
mod assignment_policy; | ||
pub(crate) mod config; | ||
mod rendezvous_hash; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters