Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grpc Endpoint Caching #42

Merged
merged 14 commits into from
May 18, 2022
7 changes: 6 additions & 1 deletion ocular/src/chain/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![warn(unused_qualifications)]

use crate::{
chain::{config::ChainClientConfig, registry::get_chain},
chain::{client::cache::Cache, config::ChainClientConfig, registry::get_chain},
error::{ChainClientError, RpcError},
keyring::Keyring,
};
Expand All @@ -12,6 +12,7 @@ use super::ChainName;

pub mod authz;
pub mod automated_tx_handler;
pub mod cache;
pub mod query;
pub mod tx;

Expand All @@ -21,6 +22,7 @@ pub struct ChainClient {
pub config: ChainClientConfig,
pub keyring: Keyring,
pub rpc_client: RpcHttpClient,
pub cache: Option<Cache>,
// light_provider: ?
// input:
// output:
Expand All @@ -39,11 +41,14 @@ fn get_client(chain_name: &str) -> Result<ChainClient, ChainClientError> {
let config = chain.get_chain_config()?;
let keyring = Keyring::new_file_store(None)?;
let rpc_client = new_rpc_http_client(config.rpc_address.as_str())?;
// Default to in memory cache
let cache = Cache::create_memory_cache(None)?;

Ok(ChainClient {
config,
keyring,
rpc_client,
cache: Some(cache),
})
}

Expand Down
20 changes: 8 additions & 12 deletions ocular/src/chain/client/automated_tx_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ mod tests {
keyring: keyring,
rpc_client: rpc::HttpClient::new("http://localhost:8080")
.expect("Could not create RPC"),
cache: None,
};

// Assert error if no toml exists
Expand Down Expand Up @@ -521,13 +522,10 @@ mod tests {
// Execute on toml; expect tx error, but ONLY tx error, everything else should work fine. Tx fails b/c this is unit test so no network connectivity
let err = chain_client
.execute_delegated_transacton_toml(toml_save_path, false)
.await
.err()
.unwrap()
.to_string();
.await;

// Expect Tx error b/c unit test has no network connectivity; do string matching b/c exact error type matching is messy
assert_eq!(&err[..35], "chain client error: transport error");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to clean up these tests here; unrelated to caching PR, they were just in a broken state, error messages aren't always consistent, hence the change.

// Expect Tx error b/c unit test has no network connectivity
assert!(err.is_err());

// Clean up dir + toml
std::fs::remove_dir_all(test_dir)
Expand Down Expand Up @@ -574,6 +572,7 @@ mod tests {
keyring: keyring,
rpc_client: rpc::HttpClient::new("http://localhost:8080")
.expect("Could not create RPC"),
cache: None,
};

// Assert error if no toml exists
Expand Down Expand Up @@ -647,13 +646,10 @@ mod tests {
// Execute on toml; expect tx error, but ONLY tx error, everything else should work fine. Tx fails b/c this is unit test so no network connectivity
let err = chain_client
.execute_batch_transactions(toml_save_path)
.await
.err()
.unwrap()
.to_string();
.await;

// Expect Tx error b/c unit test has no network connectivity; do string matching b/c exact error type matching is messy
assert_eq!(&err[..35], "chain client error: transport error");
// Expect Tx error b/c unit test has no network connectivity
assert!(err.is_err());

// Clean up dir + toml
std::fs::remove_dir_all(test_dir)
Expand Down
279 changes: 279 additions & 0 deletions ocular/src/chain/client/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
use crate::error::CacheError;

use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fs::File;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;

// Constants
pub const DEFAULT_FILE_CACHE_DIR: &str = ".ocular/cache";
pub const DEFAULT_FILE_CACHE_NAME: &str = "grpc_endpoints.toml";
/// Unix permissions for dir
const FILE_CACHE_DIR_PERMISSIONS: u32 = 0o700;

// Toml structs
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct GrpcEndpointToml {
pub endpoints: Vec<GrpcEndpoint>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct GrpcEndpoint {
pub address: String,
}

/// Broad cache object that can mange all ocular cache initialization
pub struct Cache {
pub grpc_endpoint_cache: Box<dyn GrpcCache>,
}

/// Cache accessor defintions
pub trait GrpcCache {
/// Check if cache has been initialized
fn is_initialized(&self) -> bool;
/// Add item to cache
fn add_item(&mut self, item: String) -> Result<(), CacheError>;
/// Remove item from cache
fn remove_item(&mut self, item: String) -> Result<(), CacheError>;
/// Retrieves a copy of all items from cache
fn get_all_items(&self) -> Result<HashSet<String>, CacheError>;
}

/// Cache initialization definitions
impl Cache {
/// Constructor for file cache path. Must include filename with ".toml" suffix for file type.
///
/// If override_if_exists is true it will wipe the previous file found if it exists.
/// Otherwise if set to false, it will use what is found at the file or create a new one if not found.
///
/// Toml will be required to be structured as so:
/// [[endpoints]]
/// address = "35.230.37.28:9090"
pub fn create_file_cache(
file_path: Option<&str>,
override_if_exists: bool,
) -> Result<Cache, CacheError> {
// If none, create at default: (e.g. ~/.ocular/grpc_endpoints.toml)
let path: PathBuf = match file_path {
Some(path) => PathBuf::from(path),
None => {
let mut p = dirs::home_dir().unwrap();

p.push(DEFAULT_FILE_CACHE_DIR);
p.push(DEFAULT_FILE_CACHE_NAME);

p
}
};

dbg!(format!("Attempting to use path {:#?}", path));

// Verify path formatting
if path.is_dir() {
return Err(CacheError::Initialization(String::from(
"Path is a dir; must be a file.",
)));
} else if path.extension().unwrap().to_str().unwrap() != "toml" {
return Err(CacheError::Initialization(String::from(
"Only files with extension .toml are supported.",
)));
}

// Create files/dirs based on override settings
// First just create dirs safely regardless of override settings
let save_path = path.parent().unwrap();

// Create dir if doesn't exist
if !save_path.exists() {
match std::fs::create_dir_all(save_path) {
Ok(_res) => _res,
Err(err) => return Err(CacheError::FileIO(err.to_string())),
};
}

#[cfg(unix)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes me wonder if we have to worry about testing on Windows. maybe for now we should just explicitly state we aren't supporting Windows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps. I can't imagine most libs support windows explicitly

match std::fs::set_permissions(
save_path,
std::fs::Permissions::from_mode(FILE_CACHE_DIR_PERMISSIONS),
) {
Ok(_res) => _res,
Err(err) => return Err(CacheError::FileIO(err.to_string())),
};

let mut endpoints = HashSet::new();

// Load endpoints if they exist
if path.exists() {
let content = match std::fs::read_to_string(&path) {
Ok(result) => result,
Err(err) => {
return Err(CacheError::FileIO(err.to_string()));
}
};

// Possible contents is empty, check to avoid parsing errors
if !content.is_empty() {
let toml: GrpcEndpointToml = match toml::from_str(&content) {
Ok(result) => result,
Err(err) => {
return Err(CacheError::Toml(err.to_string()));
}
};

dbg!(&toml);

for endpt in &toml.endpoints {
endpoints.insert(endpt.address.to_string());
}
}
}

// Finally we can manipulate the actual file after checking the override settings
if override_if_exists || !path.exists() {
// Note this creates a new file or truncates the existing one
if let Err(err) = File::create(&path) {
return Err(CacheError::FileIO(err.to_string()));
}
}

Ok(Cache {
grpc_endpoint_cache: Box::new(FileCache { path, endpoints }),
})
}

/// Constructor for in memory cache.
pub fn create_memory_cache(endpoints: Option<HashSet<String>>) -> Result<Cache, CacheError> {
let cache = match endpoints {
Some(endpoints) => MemoryCache { endpoints },
None => MemoryCache {
endpoints: HashSet::new(),
},
};

Ok(Cache {
grpc_endpoint_cache: Box::new(cache),
})
}
}

/// File based cache
pub struct FileCache {
path: PathBuf,
endpoints: HashSet<String>,
}

impl GrpcCache for FileCache {
fn is_initialized(&self) -> bool {
self.path.capacity() != 0
}

fn add_item(&mut self, item: String) -> Result<(), CacheError> {
self.endpoints.insert(item.clone());

let content = match std::fs::read_to_string(&self.path) {
Ok(result) => result,
Err(err) => {
return Err(CacheError::FileIO(err.to_string()));
}
};

let mut toml: GrpcEndpointToml = GrpcEndpointToml::default();

// Possible contents is empty, check to avoid parsing errors
if !content.is_empty() {
toml = match toml::from_str(&content) {
Ok(result) => result,
Err(err) => {
return Err(CacheError::Toml(err.to_string()));
}
};

dbg!(&toml);
philipjames44 marked this conversation as resolved.
Show resolved Hide resolved
}

// Add new item
toml.endpoints.push(GrpcEndpoint { address: item });

let toml_string = toml::to_string(&toml).expect("Could not encode toml value.");

dbg!(&toml_string);
philipjames44 marked this conversation as resolved.
Show resolved Hide resolved

// Rewrite file
match std::fs::write(&self.path, toml_string) {
Ok(_) => Ok(()),
Err(err) => Err(CacheError::FileIO(err.to_string())),
}
}

fn remove_item(&mut self, item: String) -> Result<(), CacheError> {
self.endpoints.remove(&item);

let mut toml: GrpcEndpointToml = GrpcEndpointToml::default();

let content = match std::fs::read_to_string(&self.path) {
Ok(result) => result,
Err(err) => {
return Err(CacheError::FileIO(err.to_string()));
}
};

// If we were able to get to this point, contents should never be empty, so no need to check.
let old_toml: GrpcEndpointToml = match toml::from_str(&content) {
Ok(result) => result,
Err(err) => {
return Err(CacheError::Toml(err.to_string()));
}
};

dbg!(&old_toml);

// Remove item by virtue of excludng it from new toml
toml.endpoints = old_toml
.endpoints
.into_iter()
.filter(|ep| ep.address.trim() != item.as_str())
.collect();

let toml_string = toml::to_string(&toml).expect("Could not encode toml value.");

dbg!(&toml_string);

// Rewrite file
match std::fs::write(&self.path, toml_string) {
Ok(_) => Ok(()),
Err(err) => Err(CacheError::FileIO(err.to_string())),
}
}

fn get_all_items(&self) -> Result<HashSet<String>, CacheError> {
Ok(self.endpoints.clone())
}
Comment on lines +248 to +250
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about how this fits into the health check: is the idea that we would randomly select one of these and then remove the unhealthy ones as we go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's what I was thinking.

}

/// Memory based cache
pub struct MemoryCache {
endpoints: HashSet<String>,
}

impl GrpcCache for MemoryCache {
fn is_initialized(&self) -> bool {
// No special intialization process so it can always be considered initialized for now.
true
}

fn add_item(&mut self, item: String) -> Result<(), CacheError> {
self.endpoints.insert(item);

Ok(())
}

fn remove_item(&mut self, item: String) -> Result<(), CacheError> {
self.endpoints.remove(&item);

Ok(())
}

fn get_all_items(&self) -> Result<HashSet<String>, CacheError> {
Ok(self.endpoints.clone())
}
}
12 changes: 12 additions & 0 deletions ocular/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub enum ChainClientError {
TxError(#[from] TxError),
#[error("{0}")]
AutomatedTxHandlerError(#[from] AutomatedTxHandlerError),
#[error("{0}")]
CacheError(#[from] CacheError),
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -125,3 +127,13 @@ pub enum AutomatedTxHandlerError {
#[error("no valid unexpired authorization grants found for msg type: {0}")]
Authorization(String),
}

#[derive(Debug, Error)]
pub enum CacheError {
#[error("error processing file: {0}")]
FileIO(String),
#[error("error intializing cache: {0}")]
Initialization(String),
#[error("error parsing toml: {0}")]
Toml(String),
}
Loading