diff --git a/Cargo.lock b/Cargo.lock index d48e2162..0b0157fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -514,6 +514,61 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core", + "bytes 1.8.0", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.0", + "hyper-util", + "itoa 1.0.11", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite 0.2.15", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio 1.41.1", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes 1.8.0", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite 0.2.15", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "azure_core" version = "0.21.0" @@ -1994,6 +2049,12 @@ dependencies = [ "hashbrown 0.15.1", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -2165,6 +2226,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi 0.3.9", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -2268,6 +2339,23 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opsml-server" +version = "0.1.0" +dependencies = [ + "axum", + "mockall", + "mockito", + "opsml-storage", + "rand 0.8.5", + "serde", + "serde_json", + "tokio 1.41.1", + "tower-http", + "tracing", + "tracing-subscriber", +] + [[package]] name = "opsml-storage" version = "0.1.0" @@ -2340,6 +2428,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "p256" version = "0.11.1" @@ -2650,7 +2744,7 @@ dependencies = [ "once_cell", "socket2 0.5.7", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2848,7 +2942,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio 1.41.1", "tokio-native-tls", "tokio-rustls 0.26.0", @@ -3052,6 +3146,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" + [[package]] name = "ryu" version = "1.0.18" @@ -3167,6 +3267,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa 1.0.11", + "serde", +] + [[package]] name = "serde_qs" version = "0.8.5" @@ -3212,6 +3322,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3346,6 +3465,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sync_wrapper" version = "1.0.1" @@ -3431,6 +3556,16 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", +] + [[package]] name = "time" version = "0.3.36" @@ -3603,6 +3738,44 @@ dependencies = [ "tokio 1.41.1", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite 0.2.15", + "sync_wrapper 0.1.2", + "tokio 1.41.1", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags 2.6.0", + "bytes 1.8.0", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "pin-project-lite 0.2.15", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -3639,6 +3812,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", ] [[package]] @@ -3651,6 +3825,45 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "nu-ansi-term", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "time", + "tracing-core", + "tracing-log", + "tracing-serde", +] + [[package]] name = "try-lock" version = "0.2.5" @@ -3738,6 +3951,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 3742a011..a059c3dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "crates/opsml_storage", + "crates/opsml_server", "opsml_storage_s3", "opsml_storage_gcs", "opsml_storage_local", @@ -19,8 +20,8 @@ repository = "https://github.com/demml/opsml-core" [workspace.dependencies] -pyo3 = { version = "0.22", features = ["experimental-async"] } opsml-storage = { path = "crates/opsml_storage" } +opsml-server = { path = "crates/opsml_server" } [profile.release] lto = "fat" diff --git a/crates/opsml_server/Cargo.toml b/crates/opsml_server/Cargo.toml new file mode 100644 index 00000000..37b273f7 --- /dev/null +++ b/crates/opsml_server/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "opsml-server" +version = { workspace = true } +edition = { workspace = true } +repository = { workspace = true } + +authors = [ + "Steven Forrester " +] + +license = "MIT" +description = "Core rust library for the opsml project" + +[dependencies] +serde = { version = "1.*", features = ["derive"] } +tokio = { version = "1.*", features = ["rt", "rt-multi-thread", "macros"] } +serde_json = "1.*" +tower-http = { version = "0.5.0", features = ["cors"] } +tracing = "0.1.40" +tracing-subscriber = {version = "0.3.18", features = ["json", "time"]} +axum = "0.7.7" +opsml-storage = { workspace = true, features = ["google_storage"] } + + +[dev-dependencies] +mockall = "0.*" +mockito = "1.*" +rand = "0.8.5" + +[profile.release] +lto = "fat" +codegen-units = 1 +strip = true diff --git a/crates/opsml_server/LICENSE b/crates/opsml_server/LICENSE new file mode 100644 index 00000000..86d8025d --- /dev/null +++ b/crates/opsml_server/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Demml + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/crates/opsml_server/Makefile b/crates/opsml_server/Makefile new file mode 100644 index 00000000..b332fb44 --- /dev/null +++ b/crates/opsml_server/Makefile @@ -0,0 +1,20 @@ + +.PHONY: format +format: + cargo fmt + +.PHONY: lint +lints: + cargo clippy --workspace --all-targets --features "google_storage, aws_storage" -- -D warnings + +.PHONY: test.aws +test.aws: + cargo test aws --features "aws_storage" -- --nocapture + +.PHONY: test.gcs +test.gcs: + cargo test google --features "google_storage" -- --nocapture + +.PHONY: test.local +test.local: + cargo test local -- --nocapture \ No newline at end of file diff --git a/crates/opsml_server/src/main.rs b/crates/opsml_server/src/main.rs new file mode 100644 index 00000000..5adb5910 --- /dev/null +++ b/crates/opsml_server/src/main.rs @@ -0,0 +1,27 @@ +use axum::{response::Html, routing::get, Router}; +use opsml_storage::core::storage::base::FileSystem; +use opsml_storage::core::storage::google::google_storage::GCSFSStorageClient; + +#[tokio::main] +async fn main() { + // build our application with routes + let app = Router::new() + .route("/", get(handler)) + .route("/health", get(health_check)); + + // run it + let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") + .await + .unwrap(); + println!("listening on {}", listener.local_addr().unwrap()); + axum::serve(listener, app).await.unwrap(); +} + +async fn handler() -> Html<&'static str> { + Html("

Hello, World!

") +} + +async fn health_check() -> &'static str { + GCSFSStorageClient::new("opsml-storage-integration".to_string()); + "OK" +} diff --git a/crates/opsml_storage/Cargo.toml b/crates/opsml_storage/Cargo.toml index b3937dd1..abf031e0 100644 --- a/crates/opsml_storage/Cargo.toml +++ b/crates/opsml_storage/Cargo.toml @@ -12,7 +12,7 @@ license = "MIT" description = "Core rust library for the opsml project" [dependencies] -pyo3 = { workspace = true } +pyo3 = { version = "0.22", features = ["experimental-async"] } serde = { version = "1.*", features = ["derive"] } reqwest = { version = "0.*", features = ["json", "stream", "multipart", "rustls-tls", "rustls-tls-native-roots" ], default-features = false } tokio = { version = "1.*", features = ["rt", "rt-multi-thread", "macros"] } @@ -57,4 +57,4 @@ uuid = { version = "1.11.0", features = ["v4"] } [profile.release] lto = "fat" codegen-units = 1 -strip = true +strip = true \ No newline at end of file diff --git a/crates/opsml_storage/src/core/storage/base.rs b/crates/opsml_storage/src/core/storage/base.rs index c2dd689d..f55a000e 100644 --- a/crates/opsml_storage/src/core/storage/base.rs +++ b/crates/opsml_storage/src/core/storage/base.rs @@ -1,5 +1,6 @@ // create pyo3 async iterator use crate::core::utils::error::StorageError; +use futures::Future; use pyo3::prelude::*; use std::path::Path; use std::path::PathBuf; @@ -50,74 +51,84 @@ pub fn get_files(path: &Path) -> Result, StorageError> { } // Define the StorageClient trait with common methods -pub trait StorageClient { - fn bucket(&self) -> &str; - fn new(bucket: String) -> Self; - fn find(&self, path: &str) -> Result, StorageError>; - fn find_info(&self, path: &str) -> Result, StorageError>; - fn get_object(&self, local_path: &str, remote_path: &str) -> Result<(), StorageError>; - fn upload_file_in_chunks( +pub trait StorageClient: Sized { + async fn bucket(&self) -> &str; + async fn new(bucket: String) -> Result; + async fn find(&self, path: &str) -> Result, StorageError>; + async fn find_info(&self, path: &str) -> Result, StorageError>; + async fn get_object(&self, local_path: &str, remote_path: &str) -> Result<(), StorageError>; + async fn upload_file_in_chunks( &self, local_path: &Path, remote_path: &Path, chunk_size: Option, ) -> Result<(), StorageError>; - fn copy_objects(&self, src: &str, dest: &str) -> Result; - fn copy_object(&self, src: &str, dest: &str) -> Result; - fn delete_objects(&self, path: &str) -> Result; - fn delete_object(&self, path: &str) -> Result; - fn generate_presigned_url(&self, path: &str, expiration: u64) -> Result; + async fn copy_objects(&self, src: &str, dest: &str) -> Result; + async fn copy_object(&self, src: &str, dest: &str) -> Result; + async fn delete_objects(&self, path: &str) -> Result; + async fn delete_object(&self, path: &str) -> Result; + async fn generate_presigned_url( + &self, + path: &str, + expiration: u64, + ) -> Result; } pub trait FileSystem { fn client(&self) -> &T; - fn new(bucket: String) -> Self; + async fn new(bucket: String) -> Self; - fn find(&self, path: &Path) -> Result, StorageError> { - let stripped_path = path.strip_path(self.client().bucket()); - self.client().find(stripped_path.to_str().unwrap()) + async fn find(&self, path: &Path) -> Result, StorageError> { + let stripped_path = path.strip_path(self.client().bucket().await); + self.client().find(stripped_path.to_str().unwrap()).await } - fn find_info(&self, path: &Path) -> Result, StorageError> { - let stripped_path = path.strip_path(self.client().bucket()); - self.client().find_info(stripped_path.to_str().unwrap()) + async fn find_info(&self, path: &Path) -> Result, StorageError> { + let stripped_path = path.strip_path(self.client().bucket().await); + self.client() + .find_info(stripped_path.to_str().unwrap()) + .await } - fn get(&self, lpath: &Path, rpath: &Path, recursive: bool) -> Result<(), StorageError> { + async fn get(&self, lpath: &Path, rpath: &Path, recursive: bool) -> Result<(), StorageError> { // strip the paths - let stripped_rpath = rpath.strip_path(self.client().bucket()); - let stripped_lpath = lpath.strip_path(self.client().bucket()); + let stripped_rpath = rpath.strip_path(self.client().bucket().await); + let stripped_lpath = lpath.strip_path(self.client().bucket().await); if recursive { let stripped_lpath_clone = stripped_lpath.clone(); // list all objects in the path - let objects = self.client().find(stripped_rpath.to_str().unwrap())?; + let objects = self.client().find(stripped_rpath.to_str().unwrap()).await?; // iterate over each object and get it for obj in objects { let file_path = Path::new(obj.as_str()); - let stripped_path = file_path.strip_path(self.client().bucket()); + let stripped_path = file_path.strip_path(self.client().bucket().await); let relative_path = file_path.relative_path(&stripped_rpath)?; let local_path = stripped_lpath_clone.join(relative_path); - self.client().get_object( - local_path.to_str().unwrap(), - stripped_path.to_str().unwrap(), - )?; + self.client() + .get_object( + local_path.to_str().unwrap(), + stripped_path.to_str().unwrap(), + ) + .await?; } } else { - self.client().get_object( - stripped_lpath.to_str().unwrap(), - stripped_rpath.to_str().unwrap(), - )?; + self.client() + .get_object( + stripped_lpath.to_str().unwrap(), + stripped_rpath.to_str().unwrap(), + ) + .await?; } Ok(()) } - fn put(&self, lpath: &Path, rpath: &Path, recursive: bool) -> Result<(), StorageError> { - let stripped_lpath = lpath.strip_path(self.client().bucket()); - let stripped_rpath = rpath.strip_path(self.client().bucket()); + async fn put(&self, lpath: &Path, rpath: &Path, recursive: bool) -> Result<(), StorageError> { + let stripped_lpath = lpath.strip_path(self.client().bucket().await); + let stripped_rpath = rpath.strip_path(self.client().bucket().await); if recursive { if !stripped_lpath.is_dir() { @@ -131,64 +142,77 @@ pub trait FileSystem { for file in files { let stripped_lpath_clone = stripped_lpath.clone(); let stripped_rpath_clone = stripped_rpath.clone(); - let stripped_file_path = file.strip_path(self.client().bucket()); + let stripped_file_path = file.strip_path(self.client().bucket().await); let relative_path = file.relative_path(&stripped_lpath_clone)?; let remote_path = stripped_rpath_clone.join(relative_path); self.client() - .upload_file_in_chunks(&stripped_file_path, &remote_path, None)?; + .upload_file_in_chunks(&stripped_file_path, &remote_path, None) + .await?; } Ok(()) } else { self.client() - .upload_file_in_chunks(&stripped_lpath, &stripped_rpath, None)?; + .upload_file_in_chunks(&stripped_lpath, &stripped_rpath, None) + .await?; Ok(()) } } - fn copy(&self, src: &Path, dest: &Path, recursive: bool) -> Result<(), StorageError> { - let stripped_src = src.strip_path(self.client().bucket()); - let stripped_dest = dest.strip_path(self.client().bucket()); + async fn copy(&self, src: &Path, dest: &Path, recursive: bool) -> Result<(), StorageError> { + let stripped_src = src.strip_path(self.client().bucket().await); + let stripped_dest = dest.strip_path(self.client().bucket().await); if recursive { - self.client().copy_objects( - stripped_src.to_str().unwrap(), - stripped_dest.to_str().unwrap(), - )?; + self.client() + .copy_objects( + stripped_src.to_str().unwrap(), + stripped_dest.to_str().unwrap(), + ) + .await?; } else { - self.client().copy_object( - stripped_src.to_str().unwrap(), - stripped_dest.to_str().unwrap(), - )?; + self.client() + .copy_object( + stripped_src.to_str().unwrap(), + stripped_dest.to_str().unwrap(), + ) + .await?; } Ok(()) } - fn rm(&self, path: &Path, recursive: bool) -> Result<(), StorageError> { - let stripped_path = path.strip_path(self.client().bucket()); + async fn rm(&self, path: &Path, recursive: bool) -> Result<(), StorageError> { + let stripped_path = path.strip_path(self.client().bucket().await); if recursive { self.client() - .delete_objects(stripped_path.to_str().unwrap())?; + .delete_objects(stripped_path.to_str().unwrap()) + .await?; } else { self.client() - .delete_object(stripped_path.to_str().unwrap())?; + .delete_object(stripped_path.to_str().unwrap()) + .await?; } Ok(()) } - fn exists(&self, path: &Path) -> Result { - let stripped_path = path.strip_path(self.client().bucket()); - let objects = self.client().find(stripped_path.to_str().unwrap())?; + async fn exists(&self, path: &Path) -> Result { + let stripped_path = path.strip_path(self.client().bucket().await); + let objects = self.client().find(stripped_path.to_str().unwrap()).await?; Ok(!objects.is_empty()) } - fn generate_presigned_url(&self, path: &Path, expiration: u64) -> Result { - let stripped_path = path.strip_path(self.client().bucket()); + async fn generate_presigned_url( + &self, + path: &Path, + expiration: u64, + ) -> Result { + let stripped_path = path.strip_path(self.client().bucket().await); self.client() .generate_presigned_url(stripped_path.to_str().unwrap(), expiration) + .await } } diff --git a/crates/opsml_storage/src/core/storage/google.rs b/crates/opsml_storage/src/core/storage/google.rs index 11e568e8..39b16e28 100644 --- a/crates/opsml_storage/src/core/storage/google.rs +++ b/crates/opsml_storage/src/core/storage/google.rs @@ -34,7 +34,6 @@ pub mod google_storage { use std::io::Write; use std::path::Path; use std::path::PathBuf; - use tokio::runtime::Runtime; #[derive(Clone)] pub struct GcpCreds { @@ -163,56 +162,47 @@ pub mod google_storage { pub struct GoogleStorageClient { pub client: Client, pub bucket: String, - runtime: tokio::runtime::Runtime, } impl StorageClient for GoogleStorageClient { - fn bucket(&self) -> &str { + async fn bucket(&self) -> &str { &self.bucket } - fn new(bucket: String) -> Self { - let rt = Runtime::new().unwrap(); + async fn new(bucket: String) -> Result { + let creds = GcpCreds::new().await?; + // If no credentials, attempt to create a default client pulling from the environment + let client: Result = if creds.creds.is_none() { + let config = ClientConfig::default().with_auth().await; + + // if error, use ClientConfig::default().anonymous(); + let config = match config { + Ok(config) => config, + Err(_) => ClientConfig::default().anonymous(), + }; - let client: Result = rt.block_on(async { - let creds = GcpCreds::new().await?; - // If no credentials, attempt to create a default client pulling from the environment - if creds.creds.is_none() { - let config = ClientConfig::default().with_auth().await; + Ok(Client::new(config)) - // if error, use ClientConfig::default().anonymous(); - let config = match config { - Ok(config) => config, - Err(_) => ClientConfig::default().anonymous(), - }; + // if creds are set (base64 for JSON file) + } else { + // try with credentials + let config = ClientConfig::default() + .with_credentials(creds.creds.unwrap()) + .await + .map_err(|e| { + StorageError::Error(format!( + "Unable to create client with credentials: {}", + e + )) + })?; - Ok(Client::new(config)) + let client = Client::new(config); + Ok(client) + }; - // if creds are set (base64 for JSON file) - } else { - // try with credentials - let config = ClientConfig::default() - .with_credentials(creds.creds.unwrap()) - .await - .map_err(|e| { - StorageError::Error(format!( - "Unable to create client with credentials: {}", - e - )) - })?; - - let client = Client::new(config); - Ok(client) - } - }); - - match client { - Ok(client) => GoogleStorageClient { - client, - bucket, - runtime: rt, - }, - Err(e) => panic!("Unable to create GoogleStorageClient: {}", e), - } + Ok(GoogleStorageClient { + client: client?, + bucket, + }) } /// Download a remote object as a stream to a local file @@ -222,8 +212,8 @@ pub mod google_storage { /// * `lpath` - The path to the local file /// * `rpath` - The path to the remote file /// - fn get_object(&self, lpath: &str, rpath: &str) -> Result<(), StorageError> { - let mut stream = self.get_object_stream(rpath)?; + async fn get_object(&self, lpath: &str, rpath: &str) -> Result<(), StorageError> { + let mut stream = self.get_object_stream(rpath).await?; // create and open lpath file let prefix = Path::new(lpath).parent().unwrap(); @@ -239,18 +229,13 @@ pub mod google_storage { let mut file = File::create(lpath) .map_err(|e| StorageError::Error(format!("Unable to create file: {}", e)))?; - self.runtime.block_on(async { - // iterate over the stream and write to the file - while let Some(v) = stream.next().await { - let chunk = - v.map_err(|e| StorageError::Error(format!("Stream error: {}", e)))?; - file.write_all(&chunk).map_err(|e| { - StorageError::Error(format!("Unable to write to file: {}", e)) - })?; - } + while let Some(v) = stream.next().await { + let chunk = v.map_err(|e| StorageError::Error(format!("Stream error: {}", e)))?; + file.write_all(&chunk) + .map_err(|e| StorageError::Error(format!("Unable to write to file: {}", e)))?; + } - Ok(()) - }) + Ok(()) } /// Generate a presigned url for an object in the storage bucket @@ -264,33 +249,31 @@ pub mod google_storage { /// /// A Result with the presigned url if successful /// - fn generate_presigned_url( + async fn generate_presigned_url( &self, path: &str, expiration: u64, ) -> Result { - self.runtime.block_on(async { - let presigned_url = self - .client - .signed_url( - &self.bucket.clone(), - path, - None, - None, - SignedURLOptions { - method: SignedURLMethod::GET, - start_time: None, - expires: std::time::Duration::from_secs(expiration), - ..Default::default() - }, - ) - .await - .map_err(|e| { - StorageError::Error(format!("Unable to generate presigned url: {}", e)) - })?; + let presigned_url = self + .client + .signed_url( + &self.bucket.clone(), + path, + None, + None, + SignedURLOptions { + method: SignedURLMethod::GET, + start_time: None, + expires: std::time::Duration::from_secs(expiration), + ..Default::default() + }, + ) + .await + .map_err(|e| { + StorageError::Error(format!("Unable to generate presigned url: {}", e)) + })?; - Ok(presigned_url) - }) + Ok(presigned_url) } /// Upload file in chunks. This method will take a file path, open the file, read it in chunks and upload each chunk to the object @@ -305,7 +288,7 @@ pub mod google_storage { /// /// A Result with the object name if successful /// - fn upload_file_in_chunks( + async fn upload_file_in_chunks( &self, lpath: &Path, rpath: &Path, @@ -336,7 +319,9 @@ pub mod google_storage { chunk_count -= 1; } - let mut uploader = self.create_multipart_upload(rpath.to_str().unwrap())?; + let mut uploader = self + .create_multipart_upload(rpath.to_str().unwrap()) + .await?; let mut status = UploadStatus::NotStarted; for chunk_index in 0..chunk_count { @@ -376,26 +361,24 @@ pub mod google_storage { /// # Returns /// /// A list of objects in the path - fn find(&self, path: &str) -> Result, StorageError> { - self.runtime.block_on(async { - let result = self - .client - .list_objects(&ListObjectsRequest { - bucket: self.bucket.clone(), - prefix: Some(path.to_string()), - ..Default::default() - }) - .await - .map_err(|e| StorageError::Error(format!("Unable to list objects: {}", e)))?; - - // return a list of object names if results.items is not None, Else return empty list - Ok(result - .items - .unwrap_or_else(Vec::new) - .iter() - .map(|o| o.name.clone()) - .collect()) - }) + async fn find(&self, path: &str) -> Result, StorageError> { + let result = self + .client + .list_objects(&ListObjectsRequest { + bucket: self.bucket.clone(), + prefix: Some(path.to_string()), + ..Default::default() + }) + .await + .map_err(|e| StorageError::Error(format!("Unable to list objects: {}", e)))?; + + // return a list of object names if results.items is not None, Else return empty list + Ok(result + .items + .unwrap_or_else(Vec::new) + .iter() + .map(|o| o.name.clone()) + .collect()) } /// Find object information. Runs the same operation as find but returns more information about each object @@ -406,21 +389,18 @@ pub mod google_storage { /// /// # Returns /// - fn find_info(&self, path: &str) -> Result, StorageError> { - let objects = self.runtime.block_on(async { - let result = self - .client - .list_objects(&ListObjectsRequest { - bucket: self.bucket.clone(), - prefix: Some(path.to_string()), - ..Default::default() - }) - .await - .map_err(|e| StorageError::Error(format!("Unable to list objects: {}", e)))?; - Ok(result) - })?; + async fn find_info(&self, path: &str) -> Result, StorageError> { + let result = self + .client + .list_objects(&ListObjectsRequest { + bucket: self.bucket.clone(), + prefix: Some(path.to_string()), + ..Default::default() + }) + .await + .map_err(|e| StorageError::Error(format!("Unable to list objects: {}", e)))?; - Ok(objects + Ok(result .items .unwrap_or_else(Vec::new) .iter() @@ -447,23 +427,21 @@ pub mod google_storage { /// # Returns /// /// A Result with the object name if successful - fn copy_object(&self, src: &str, dest: &str) -> Result { - self.runtime.block_on(async { - self.client - .copy_object( - &google_cloud_storage::http::objects::copy::CopyObjectRequest { - source_bucket: self.bucket.clone(), - source_object: src.to_string(), - destination_bucket: self.bucket.clone(), - destination_object: dest.to_string(), - ..Default::default() - }, - ) - .await - .map_err(|e| StorageError::Error(format!("Unable to copy object: {}", e)))?; + async fn copy_object(&self, src: &str, dest: &str) -> Result { + self.client + .copy_object( + &google_cloud_storage::http::objects::copy::CopyObjectRequest { + source_bucket: self.bucket.clone(), + source_object: src.to_string(), + destination_bucket: self.bucket.clone(), + destination_object: dest.to_string(), + ..Default::default() + }, + ) + .await + .map_err(|e| StorageError::Error(format!("Unable to copy object: {}", e)))?; - Ok(true) - }) + Ok(true) } /// Copy objects from one bucket to another without deleting the source objects @@ -473,8 +451,8 @@ pub mod google_storage { /// * `src` - The path to the source object /// * `dest` - The path to the destination object /// - fn copy_objects(&self, src: &str, dest: &str) -> Result { - let objects = self.find(src)?; + async fn copy_objects(&self, src: &str, dest: &str) -> Result { + let objects = self.find(src).await?; let dest = Path::new(dest); let src = PathBuf::from(src); @@ -483,7 +461,8 @@ pub mod google_storage { let relative_path = file_path.relative_path(&src)?; let remote_path = dest.join(relative_path); - self.copy_object(file_path.to_str().unwrap(), remote_path.to_str().unwrap())?; + self.copy_object(file_path.to_str().unwrap(), remote_path.to_str().unwrap()) + .await?; } Ok(true) @@ -495,21 +474,19 @@ pub mod google_storage { /// /// * `path` - The path to the object in the bucket /// - fn delete_object(&self, path: &str) -> Result { + async fn delete_object(&self, path: &str) -> Result { let request = DeleteObjectRequest { bucket: self.bucket.clone(), object: path.to_string(), ..Default::default() }; - self.runtime.block_on(async { - self.client - .delete_object(&request) - .await - .map_err(|e| StorageError::Error(format!("Unable to delete object: {}", e)))?; + self.client + .delete_object(&request) + .await + .map_err(|e| StorageError::Error(format!("Unable to delete object: {}", e)))?; - Ok(true) - }) + Ok(true) } /// Delete an object from the storage bucket @@ -518,11 +495,11 @@ pub mod google_storage { /// /// * `path` - The path to the object in the bucket /// - fn delete_objects(&self, path: &str) -> Result { - let objects = self.find(path)?; + async fn delete_objects(&self, path: &str) -> Result { + let objects = self.find(path).await?; for obj in objects { - self.delete_object(obj.as_str())?; + self.delete_object(obj.as_str()).await?; } Ok(true) @@ -537,7 +514,7 @@ pub mod google_storage { /// * `path` - The path to the object in the bucket /// /// - pub fn create_multipart_upload( + pub async fn create_multipart_upload( &self, path: &str, ) -> Result { @@ -549,25 +526,23 @@ pub mod google_storage { ..Default::default() }; - self.runtime.block_on(async { - let result = self - .client - .prepare_resumable_upload( - &UploadObjectRequest { - bucket: self.bucket.clone(), - ..Default::default() - }, - &UploadType::Multipart(Box::new(metadata)), - ) - .await - .map_err(|e| { - StorageError::Error(format!("Unable to create resumable session: {}", e)) - })?; + let result = self + .client + .prepare_resumable_upload( + &UploadObjectRequest { + bucket: self.bucket.clone(), + ..Default::default() + }, + &UploadType::Multipart(Box::new(metadata)), + ) + .await + .map_err(|e| { + StorageError::Error(format!("Unable to create resumable session: {}", e)) + })?; - Ok(GoogleMultipartUploadClient { - client: result, - handle: tokio::runtime::Handle::current(), - }) + Ok(GoogleMultipartUploadClient { + client: result, + handle: tokio::runtime::Handle::current(), }) } @@ -622,31 +597,27 @@ pub mod google_storage { /// # Returns /// /// A stream of bytes - pub fn get_object_stream( + pub async fn get_object_stream( &self, rpath: &str, ) -> Result< impl Stream>, StorageError, > { - self.runtime.block_on(async { - // open a bucket and blob and return the stream - let result = self - .client - .download_streamed_object( - &GetObjectRequest { - bucket: self.bucket.clone(), - object: rpath.to_string(), - ..Default::default() - }, - &Range::default(), - ) - .await - .map_err(|e| { - StorageError::Error(format!("Unable to download object: {}", e)) - })?; - Ok(result) - }) + // open a bucket and blob and return the stream + let result = self + .client + .download_streamed_object( + &GetObjectRequest { + bucket: self.bucket.clone(), + object: rpath.to_string(), + ..Default::default() + }, + &Range::default(), + ) + .await + .map_err(|e| StorageError::Error(format!("Unable to download object: {}", e)))?; + Ok(result) } } @@ -658,9 +629,9 @@ pub mod google_storage { fn client(&self) -> &GoogleStorageClient { &self.client } - fn new(bucket: String) -> Self { + async fn new(bucket: String) -> Self { GCSFSStorageClient { - client: GoogleStorageClient::new(bucket), + client: GoogleStorageClient::new(bucket).await.unwrap(), } } } @@ -668,24 +639,36 @@ pub mod google_storage { #[pyclass] pub struct PyGCSFSStorageClient { client: GoogleStorageClient, + runtime: tokio::runtime::Runtime, } #[pymethods] impl PyGCSFSStorageClient { #[new] fn new(bucket: String) -> Self { - let client = GoogleStorageClient::new(bucket); - Self { client } + let rt = tokio::runtime::Runtime::new().unwrap(); + + let client = rt.block_on(async { GoogleStorageClient::new(bucket).await.unwrap() }); + + Self { + client, + runtime: rt, + } } fn find_info(&self, path: PathBuf) -> Result, StorageError> { - self.client.find_info(path.to_str().unwrap()) + let stripped_path = path.strip_path(&self.client.bucket); + + self.runtime + .block_on(async { self.client.find_info(stripped_path.to_str().unwrap()).await }) } #[pyo3(signature = (path=PathBuf::new()))] fn find(&self, path: PathBuf) -> Result, StorageError> { let stripped_path = path.strip_path(&self.client.bucket); - self.client.find(stripped_path.to_str().unwrap()) + + self.runtime + .block_on(async { self.client.find(stripped_path.to_str().unwrap()).await }) } #[pyo3(signature = (lpath, rpath, recursive = false))] @@ -694,32 +677,39 @@ pub mod google_storage { let stripped_rpath = rpath.strip_path(&self.client.bucket); let stripped_lpath = lpath.strip_path(&self.client.bucket); - if recursive { - let stripped_lpath_clone = stripped_lpath.clone(); + self.runtime.block_on(async { + if recursive { + let stripped_lpath_clone = stripped_lpath.clone(); + + // list all objects in the path - // list all objects in the path - let objects = self.client.find(stripped_rpath.to_str().unwrap())?; + let objects = self.client.find(stripped_rpath.to_str().unwrap()).await?; - // iterate over each object and get it - for obj in objects { - let file_path = Path::new(obj.as_str()); - let stripped_path = file_path.strip_path(&self.client.bucket); - let relative_path = file_path.relative_path(&stripped_rpath)?; - let local_path = stripped_lpath_clone.join(relative_path); + // iterate over each object and get it + for obj in objects { + let file_path = Path::new(obj.as_str()); + let stripped_path = file_path.strip_path(&self.client.bucket); + let relative_path = file_path.relative_path(&stripped_rpath)?; + let local_path = stripped_lpath_clone.join(relative_path); - self.client.get_object( - local_path.to_str().unwrap(), - stripped_path.to_str().unwrap(), - )?; + self.client + .get_object( + local_path.to_str().unwrap(), + stripped_path.to_str().unwrap(), + ) + .await?; + } + } else { + self.client + .get_object( + stripped_lpath.to_str().unwrap(), + stripped_rpath.to_str().unwrap(), + ) + .await?; } - } else { - self.client.get_object( - stripped_lpath.to_str().unwrap(), - stripped_rpath.to_str().unwrap(), - )?; - } - Ok(()) + Ok(()) + }) } #[pyo3(signature = (lpath, rpath, recursive = false))] @@ -727,33 +717,37 @@ pub mod google_storage { let stripped_lpath = lpath.strip_path(&self.client.bucket); let stripped_rpath = rpath.strip_path(&self.client.bucket); - if recursive { - if !stripped_lpath.is_dir() { - return Err(StorageError::Error( - "Local path must be a directory for recursive put".to_string(), - )); - } + self.runtime.block_on(async { + if recursive { + if !stripped_lpath.is_dir() { + return Err(StorageError::Error( + "Local path must be a directory for recursive put".to_string(), + )); + } - let files: Vec = get_files(&stripped_lpath)?; + let files: Vec = get_files(&stripped_lpath)?; - for file in files { - let stripped_lpath_clone = stripped_lpath.clone(); - let stripped_rpath_clone = stripped_rpath.clone(); - let stripped_file_path = file.strip_path(&self.client.bucket); + for file in files { + let stripped_lpath_clone = stripped_lpath.clone(); + let stripped_rpath_clone = stripped_rpath.clone(); + let stripped_file_path = file.strip_path(&self.client.bucket); + + let relative_path = file.relative_path(&stripped_lpath_clone)?; + let remote_path = stripped_rpath_clone.join(relative_path); - let relative_path = file.relative_path(&stripped_lpath_clone)?; - let remote_path = stripped_rpath_clone.join(relative_path); + self.client + .upload_file_in_chunks(&stripped_file_path, &remote_path, None) + .await?; + } + Ok(()) + } else { self.client - .upload_file_in_chunks(&stripped_file_path, &remote_path, None)?; + .upload_file_in_chunks(&stripped_lpath, &stripped_rpath, None) + .await?; + Ok(()) } - - Ok(()) - } else { - self.client - .upload_file_in_chunks(&stripped_lpath, &stripped_rpath, None)?; - Ok(()) - } + }) } #[pyo3(signature = (src, dest, recursive = false))] @@ -761,38 +755,52 @@ pub mod google_storage { let stripped_src = src.strip_path(&self.client.bucket); let stripped_dest = dest.strip_path(&self.client.bucket); - if recursive { - self.client.copy_objects( - stripped_src.to_str().unwrap(), - stripped_dest.to_str().unwrap(), - )?; - } else { - self.client.copy_object( - stripped_src.to_str().unwrap(), - stripped_dest.to_str().unwrap(), - )?; - } + self.runtime.block_on(async { + if recursive { + self.client + .copy_objects( + stripped_src.to_str().unwrap(), + stripped_dest.to_str().unwrap(), + ) + .await?; + } else { + self.client + .copy_object( + stripped_src.to_str().unwrap(), + stripped_dest.to_str().unwrap(), + ) + .await?; + } - Ok(()) + Ok(()) + }) } #[pyo3(signature = (path, recursive = false))] fn rm(&self, path: PathBuf, recursive: bool) -> Result<(), StorageError> { let stripped_path = path.strip_path(&self.client.bucket); - if recursive { - self.client - .delete_objects(stripped_path.to_str().unwrap())?; - } else { - self.client.delete_object(stripped_path.to_str().unwrap())?; - } + self.runtime.block_on(async { + if recursive { + self.client + .delete_objects(stripped_path.to_str().unwrap()) + .await?; + } else { + self.client + .delete_object(stripped_path.to_str().unwrap()) + .await?; + } - Ok(()) + Ok(()) + }) } fn exists(&self, path: PathBuf) -> Result { let stripped_path = path.strip_path(&self.client.bucket); - let objects = self.client.find(stripped_path.to_str().unwrap())?; + + let objects = self + .runtime + .block_on(async { self.client.find(stripped_path.to_str().unwrap()).await })?; Ok(!objects.is_empty()) } @@ -804,8 +812,12 @@ pub mod google_storage { expiration: u64, ) -> Result { let stripped_path = path.strip_path(&self.client.bucket); - self.client - .generate_presigned_url(stripped_path.to_str().unwrap(), expiration) + + self.runtime.block_on(async { + self.client + .generate_presigned_url(stripped_path.to_str().unwrap(), expiration) + .await + }) } } @@ -902,20 +914,20 @@ pub mod google_storage { let _client = GoogleStorageClient::new(bucket); } - #[test] - fn test_google_storage_client_get_object() { + #[tokio::test] + async fn test_google_storage_client_get_object() { let bucket = get_bucket(); - let client = GoogleStorageClient::new(bucket); + let client = GoogleStorageClient::new(bucket).await.unwrap(); // should fail since there are no suffixes - let result = client.get_object("local_path", "remote_path"); + let result = client.get_object("local_path", "remote_path").await; assert!(result.is_err()); // Assuming the object does not exist } - #[test] - fn test_google_storage_client_put() { + #[tokio::test] + async fn test_google_storage_client_put() { let bucket = get_bucket(); - let client = GCSFSStorageClient::new(bucket); + let client = GCSFSStorageClient::new(bucket).await; // let dirname = create_nested_data(&CHUNK_SIZE); @@ -924,25 +936,25 @@ pub mod google_storage { let rpath = Path::new(&dirname); // put the file - client.put(lpath, rpath, true).unwrap(); + client.put(lpath, rpath, true).await.unwrap(); // check if the file exists - let exists = client.exists(rpath).unwrap(); + let exists = client.exists(rpath).await.unwrap(); assert!(exists); // list all files - let files = client.find(rpath).unwrap(); + let files = client.find(rpath).await.unwrap(); assert_eq!(files.len(), 2); // list files with info - let files = client.find_info(rpath).unwrap(); + let files = client.find_info(rpath).await.unwrap(); assert_eq!(files.len(), 2); // download the files let new_path = uuid::Uuid::new_v4().to_string(); let new_path = Path::new(&new_path); - client.get(new_path, rpath, true).unwrap(); + client.get(new_path, rpath, true).await.unwrap(); // check if the files are the same let files = get_files(rpath).unwrap(); @@ -954,47 +966,47 @@ pub mod google_storage { // create a new path let copy_path = uuid::Uuid::new_v4().to_string(); let copy_path = Path::new(©_path); - client.copy(rpath, copy_path, true).unwrap(); - let files = client.find(copy_path).unwrap(); + client.copy(rpath, copy_path, true).await.unwrap(); + let files = client.find(copy_path).await.unwrap(); assert_eq!(files.len(), 2); // cleanup std::fs::remove_dir_all(&dirname).unwrap(); std::fs::remove_dir_all(new_path).unwrap(); - client.rm(rpath, true).unwrap(); - client.rm(copy_path, true).unwrap(); + client.rm(rpath, true).await.unwrap(); + client.rm(copy_path, true).await.unwrap(); // check if the file exists - let exists = client.exists(rpath).unwrap(); + let exists = client.exists(rpath).await.unwrap(); assert!(!exists); } - #[test] - fn test_google_storage_client_generate_presigned_url() { + #[tokio::test] + async fn test_google_storage_client_generate_presigned_url() { let bucket = get_bucket(); - let client = GCSFSStorageClient::new(bucket); + let client = GCSFSStorageClient::new(bucket).await; // create file let key = create_single_file(&CHUNK_SIZE); let path = Path::new(&key); // put the file - client.put(path, path, false).unwrap(); + client.put(path, path, false).await.unwrap(); // generate presigned url - let url = client.generate_presigned_url(path, 3600).unwrap(); + let url = client.generate_presigned_url(path, 3600).await.unwrap(); assert!(!url.is_empty()); // cleanup - client.rm(path, false).unwrap(); + client.rm(path, false).await.unwrap(); std::fs::remove_dir_all(path.parent().unwrap()).unwrap(); } - #[test] - fn test_google_large_file_upload() { + #[tokio::test] + async fn test_google_large_file_upload() { let bucket = get_bucket(); - let client = GCSFSStorageClient::new(bucket); + let client = GCSFSStorageClient::new(bucket).await; // create file let chunk_size = 1024 * 1024 * 5; // 5MB @@ -1002,10 +1014,10 @@ pub mod google_storage { let path = Path::new(&key); // put the file - client.put(path, path, false).unwrap(); + client.put(path, path, false).await.unwrap(); // get the file info - let info = client.find_info(path).unwrap(); + let info = client.find_info(path).await.unwrap(); assert_eq!(info.len(), 1); // get item and assert it's at least the size of the file @@ -1013,7 +1025,7 @@ pub mod google_storage { assert!(item.size >= 1024 * 1024 * 10); // cleanup - client.rm(path, false).unwrap(); + client.rm(path, false).await.unwrap(); std::fs::remove_dir_all(path.parent().unwrap()).unwrap(); } }