Skip to content

Commit

Permalink
[ENH] Add s3 storage for rust worker (#1643)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Update bin/cluster-test to build rust worker
- Parameterize dockerfile to support building the docker image for and
not for integration tests
	 - Add build.rs step for integration tests
	 - Make memberlist and storage tests only run in response to this flag
 - New functionality
      - Adds a basic storage system that gets/puts to s3

## Test plan
*How are these changes tested?*
New tests were added for basic use of storage.
- [x] Tests pass locally with `cargo test`
  • Loading branch information
HammadB authored Jan 17, 2024
1 parent d5b4a64 commit 7aaf36f
Show file tree
Hide file tree
Showing 11 changed files with 933 additions and 43 deletions.
674 changes: 642 additions & 32 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ kube = { version = "0.87.1", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.20.0", features = ["latest"] }
bytes = "1.5.0"
parking_lot = "0.12.1"
aws-sdk-s3 = "1.5.0"
aws-smithy-types = "1.1.0"
aws-config = { version = "1.1.2", features = ["behavior-version-latest"] }

[build-dependencies]
tonic-build = "0.10"
Expand Down
2 changes: 2 additions & 0 deletions rust/worker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
FROM rust:1.74.1 as builder
ARG CHROMA_KUBERNETES_INTEGRATION=0
ENV CHROMA_KUBERNETES_INTEGRATION $CHROMA_KUBERNETES_INTEGRATION

WORKDIR /
RUN git clone https://github.com/chroma-core/hnswlib.git
Expand Down
13 changes: 13 additions & 0 deletions rust/worker/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,18 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.flag("-ftree-vectorize")
.compile("bindings");

// Set a compile flag based on an environment variable that tells us if we should
// run the cluster tests
let run_cluster_tests_env_var = std::env::var("CHROMA_KUBERNETES_INTEGRATION");
match run_cluster_tests_env_var {
Ok(val) => {
let lowered = val.to_lowercase();
if lowered == "true" || lowered == "1" {
println!("cargo:rustc-cfg=CHROMA_KUBERNETES_INTEGRATION");
}
}
Err(_) => {}
}

Ok(())
}
3 changes: 3 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ worker:
port: 50051
segment_manager:
storage_path: "./tmp/segment_manager/"
storage:
S3:
bucket: "chroma-storage"
14 changes: 13 additions & 1 deletion rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub(crate) struct WorkerConfig {
pub(crate) ingest: crate::ingest::config::IngestConfig,
pub(crate) sysdb: crate::sysdb::config::SysDbConfig,
pub(crate) segment_manager: crate::segment::config::SegmentManagerConfig,
pub(crate) storage: crate::storage::config::StorageConfig,
}

/// # Description
Expand Down Expand Up @@ -156,7 +157,9 @@ mod tests {
port: 50051
segment_manager:
storage_path: "/tmp"
storage:
S3:
bucket: "chroma"
"#,
);
let config = RootConfig::load();
Expand Down Expand Up @@ -198,6 +201,9 @@ mod tests {
port: 50051
segment_manager:
storage_path: "/tmp"
storage:
S3:
bucket: "chroma"
"#,
);
Expand Down Expand Up @@ -255,6 +261,9 @@ mod tests {
port: 50051
segment_manager:
storage_path: "/tmp"
storage:
S3:
bucket: "chroma"
"#,
);
Expand Down Expand Up @@ -293,6 +302,9 @@ mod tests {
port: 50051
segment_manager:
storage_path: "/tmp"
storage:
S3:
bucket: "chroma"
"#,
);
let config = RootConfig::load();
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod ingest;
mod memberlist;
mod segment;
mod server;
mod storage;
mod sysdb;
mod system;
mod types;
Expand Down
21 changes: 11 additions & 10 deletions rust/worker/src/memberlist/memberlist_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,21 @@ mod tests {
use super::*;

#[tokio::test]
#[cfg(CHROMA_KUBERNETES_INTEGRATION)]
async fn it_can_work() {
// TODO: This only works if you have a kubernetes cluster running locally with a memberlist
// We need to implement a test harness for this. For now, it will silently do nothing
// if you don't have a kubernetes cluster running locally and only serve as a reminder
// and demonstration of how to use the memberlist provider.
// let kube_ns = "chroma".to_string();
// let kube_client = Client::try_default().await.unwrap();
// let memberlist_provider = CustomResourceMemberlistProvider::new(
// "worker-memberlist".to_string(),
// kube_client.clone(),
// kube_ns.clone(),
// 10,
// );
// let mut system = System::new();
// let handle = system.start_component(memberlist_provider);
let kube_ns = "chroma".to_string();
let kube_client = Client::try_default().await.unwrap();
let memberlist_provider = CustomResourceMemberlistProvider::new(
"worker-memberlist".to_string(),
kube_client.clone(),
kube_ns.clone(),
10,
);
let mut system = System::new();
let handle = system.start_component(memberlist_provider);
}
}
20 changes: 20 additions & 0 deletions rust/worker/src/storage/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use serde::Deserialize;

#[derive(Deserialize)]
/// The configuration for the chosen storage.
/// # Options
/// - S3: The configuration for the s3 storage.
/// # Notes
/// See config.rs in the root of the worker crate for an example of how to use
/// config files to configure the worker.
pub(crate) enum StorageConfig {
S3(S3StorageConfig),
}

#[derive(Deserialize)]
/// The configuration for the s3 storage type
/// # Fields
/// - bucket: The name of the bucket to use.
pub(crate) struct S3StorageConfig {
pub(crate) bucket: String,
}
9 changes: 9 additions & 0 deletions rust/worker/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use async_trait::async_trait;
pub(crate) mod config;
pub(crate) mod s3;

#[async_trait]
trait Storage {
async fn get(&self, key: &str, path: &str) -> Result<(), String>;
async fn put(&self, key: &str, path: &str) -> Result<(), String>;
}
216 changes: 216 additions & 0 deletions rust/worker/src/storage/s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Presents an interface to a storage backend such as s3 or local disk.
// The interface is a simple key-value store, which maps to s3 well.
// For now the interface fetches a file and stores it at a specific
// location on disk. This is not ideal for s3, but it is a start.

// Ideally we would support streaming the file from s3 to the index
// but the current implementation of hnswlib makes this complicated.
// Once we move to our own implementation of hnswlib we can support
// streaming from s3.

use super::{config::StorageConfig, Storage};
use crate::config::{Configurable, WorkerConfig};
use crate::errors::ChromaError;
use async_trait::async_trait;
use aws_sdk_s3;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::create_bucket::CreateBucketError;
use aws_smithy_types::byte_stream::ByteStream;
use std::clone::Clone;
use std::io::Write;

#[derive(Clone)]
struct S3Storage {
bucket: String,
client: aws_sdk_s3::Client,
}

impl S3Storage {
fn new(bucket: &str, client: aws_sdk_s3::Client) -> S3Storage {
return S3Storage {
bucket: bucket.to_string(),
client: client,
};
}

async fn create_bucket(&self) -> Result<(), String> {
// Creates a public bucket with default settings in the region.
// This should only be used for testing and in production
// the bucket should be provisioned ahead of time.
let res = self
.client
.create_bucket()
.bucket(self.bucket.clone())
.send()
.await;
match res {
Ok(_) => {
println!("created bucket {}", self.bucket);
return Ok(());
}
Err(e) => match e {
SdkError::ServiceError(err) => match err.into_err() {
CreateBucketError::BucketAlreadyExists(msg) => {
println!("bucket already exists: {}", msg);
return Ok(());
}
CreateBucketError::BucketAlreadyOwnedByYou(msg) => {
println!("bucket already owned by you: {}", msg);
return Ok(());
}
e => {
println!("error: {}", e.to_string());
return Err::<(), String>(e.to_string());
}
},
_ => {
println!("error: {}", e);
return Err::<(), String>(e.to_string());
}
},
}
}
}

#[async_trait]
impl Configurable for S3Storage {
async fn try_from_config(config: &WorkerConfig) -> Result<Self, Box<dyn ChromaError>> {
match &config.storage {
StorageConfig::S3(s3_config) => {
let config = aws_config::load_from_env().await;
let client = aws_sdk_s3::Client::new(&config);

let storage = S3Storage::new(&s3_config.bucket, client);
return Ok(storage);
}
}
}
}

#[async_trait]
impl Storage for S3Storage {
async fn get(&self, key: &str, path: &str) -> Result<(), String> {
let mut file = std::fs::File::create(path);
let res = self
.client
.get_object()
.bucket(self.bucket.clone())
.key(key)
.send()
.await;
match res {
Ok(mut res) => {
match file {
Ok(mut file) => {
while let bytes = res.body.next().await {
match bytes {
Some(bytes) => match bytes {
Ok(bytes) => {
file.write_all(&bytes).unwrap();
}
Err(e) => {
println!("error: {}", e);
return Err::<(), String>(e.to_string());
}
},
None => {
// Stream is done
return Ok(());
}
}
}
}
Err(e) => {
println!("error: {}", e);
return Err::<(), String>(e.to_string());
}
}
return Ok(());
}
Err(e) => {
println!("error: {}", e);
return Err::<(), String>(e.to_string());
}
}
}

async fn put(&self, key: &str, path: &str) -> Result<(), String> {
// Puts from a file on disk to s3.
let bytestream = ByteStream::from_path(path).await;
match bytestream {
Ok(bytestream) => {
let res = self
.client
.put_object()
.bucket(self.bucket.clone())
.key(key)
.body(bytestream)
.send()
.await;
match res {
Ok(_) => {
println!("put object {} to bucket {}", key, self.bucket);
return Ok(());
}
Err(e) => {
println!("error: {}", e);
return Err::<(), String>(e.to_string());
}
}
}
Err(e) => {
println!("error: {}", e);
return Err::<(), String>(e.to_string());
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;

#[tokio::test]
#[cfg(CHROMA_KUBERNETES_INTEGRATION)]
async fn test_get() {
// Set up credentials assuming minio is running locally
let cred = aws_sdk_s3::config::Credentials::new(
"minio",
"minio123",
None,
None,
"loaded-from-env",
);

// Set up s3 client
let config = aws_sdk_s3::config::Builder::new()
.endpoint_url("http://127.0.0.1:9000".to_string())
.credentials_provider(cred)
.behavior_version_latest()
.region(aws_sdk_s3::config::Region::new("us-east-1"))
.force_path_style(true)
.build();
let client = aws_sdk_s3::Client::from_conf(config);

let storage = S3Storage {
bucket: "test".to_string(),
client: client,
};
storage.create_bucket().await.unwrap();

// Write some data to a test file, put it in s3, get it back and verify its contents
let tmp_dir = tempdir().unwrap();
let persist_path = tmp_dir.path().to_str().unwrap().to_string();

let test_data = "test data";
let test_file_in = format!("{}/test_file_in", persist_path);
let test_file_out = format!("{}/test_file_out", persist_path);
std::fs::write(&test_file_in, test_data).unwrap();
storage.put("test", &test_file_in).await.unwrap();
storage.get("test", &test_file_out).await.unwrap();

let contents = std::fs::read_to_string(test_file_out).unwrap();
assert_eq!(contents, test_data);
}
}

0 comments on commit 7aaf36f

Please sign in to comment.