Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fixed binary for react native #288

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions libindy_vdr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ thiserror = "1.0"
time = { version = "=0.3.20", features = ["parsing"] }
url = "2.2.2"
zmq = "0.9"
async-trait = "0.1.77"
async-lock = "3.3.0"
sled = "0.34.7"

[dev-dependencies]
Expand Down
39 changes: 22 additions & 17 deletions libindy_vdr/src/pool/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use async_lock::RwLock;
use async_trait::async_trait;
use std::{fmt::Display, sync::Arc};
use std::{
fmt::Display,
sync::{Arc, RwLock},
};

pub mod storage;
pub mod strategy;

#[async_trait]
pub trait CacheStrategy<K, V>: Send + Sync + 'static {
async fn get(&self, key: &K) -> Option<V>;
fn get(&self, key: &K) -> Option<V>;

async fn remove(&mut self, key: &K) -> Option<V>;
fn remove(&self, key: &K) -> Option<V>;

async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V>;
fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V>;
}

pub struct Cache<K: Display, V> {
Expand All @@ -34,23 +34,28 @@ impl<K: Display + 'static, V: 'static> Cache<K, V> {
}
}

pub async fn get(&self, key: &K) -> Option<V> {
pub fn get(&self, key: &K) -> Option<V> {
let full_key = self.full_key(key);
self.storage.read().await.get(&full_key).await
if let Ok(storage) = self.storage.read() {
return storage.get(&full_key);
}
None
}

pub async fn remove(&self, key: &K) -> Option<V> {
pub fn remove(&self, key: &K) -> Option<V> {
let full_key = self.full_key(key);
self.storage.write().await.remove(&full_key).await
if let Ok(storage) = self.storage.write() {
return storage.remove(&full_key);
}
None
}

pub async fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
pub fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
let full_key = self.full_key(&key);
self.storage
.write()
.await
.insert(full_key, value, custom_exp_offset)
.await
if let Ok(storage) = self.storage.write() {
return storage.insert(full_key, value, custom_exp_offset);
}
None
}
}

Expand Down
196 changes: 92 additions & 104 deletions libindy_vdr/src/pool/cache/strategy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use super::storage::OrderedHashMap;
use super::CacheStrategy;
use async_lock::Mutex;
use async_trait::async_trait;
use std::{collections::BTreeMap, fmt::Debug, hash::Hash, sync::Arc, time::SystemTime};
use std::{
collections::BTreeMap,
fmt::Debug,
hash::Hash,
ops::Deref,
sync::{Arc, Mutex},
time::SystemTime,
};

/// A simple struct to hold a value and the expiry offset
/// needed because items can be inserted with custom ttl values
Expand Down Expand Up @@ -50,89 +55,94 @@ impl<K: Eq + Hash + Clone + Send + Sync + 'static, V: Clone + Send + Sync + 'sta
}
}

#[async_trait]
impl<K: Send + Sync + 'static, V: Send + Sync + 'static> CacheStrategy<K, V>
for Arc<dyn CacheStrategy<K, V>>
{
async fn get(&self, key: &K) -> Option<V> {
self.get(key).await
fn get(&self, key: &K) -> Option<V> {
self.deref().get(key)
}
async fn remove(&mut self, key: &K) -> Option<V> {
self.remove(key).await
fn remove(&self, key: &K) -> Option<V> {
self.deref().remove(key)
}
async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
self.insert(key, value, custom_exp_offset).await
fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
self.deref().insert(key, value, custom_exp_offset)
}
}

#[async_trait]
impl<K: Hash + Eq + Send + Sync + 'static + Clone + Debug, V: Clone + Send + Sync + 'static>
CacheStrategy<K, V> for CacheStrategyTTL<K, V>
{
async fn get(&self, key: &K) -> Option<V> {
let mut store_lock = self.store.lock().await;
let current_time = SystemTime::now()
.duration_since(self.create_time)
.unwrap()
.as_millis();
let get_res = match store_lock.get(key) {
Some((ts, v)) => {
if current_time < *ts {
Some((*ts, v.clone()))
} else {
store_lock.remove(key);
None
fn get(&self, key: &K) -> Option<V> {
if let Some(mut store_lock) = self.store.lock().ok() {
let current_time = SystemTime::now()
.duration_since(self.create_time)
.unwrap()
.as_millis();
let get_res = match store_lock.get(key) {
Some((ts, v)) => {
if current_time < *ts {
Some((*ts, v.clone()))
} else {
store_lock.remove(key);
None
}
}
None => None,
};
// update the timestamp if the entry is still valid
if let Some((_, ref v)) = get_res {
store_lock.re_order(key, current_time + v.expire_offset);
}
None => None,
};
// update the timestamp if the entry is still valid
if let Some((_, ref v)) = get_res {
store_lock.re_order(key, current_time + v.expire_offset);
return get_res.map(|(_, v)| v.value);
}
get_res.map(|(_, v)| v.value)
None
}
async fn remove(&mut self, key: &K) -> Option<V> {
self.store.lock().await.remove(key).map(|(_, v)| v.value)
fn remove(&self, key: &K) -> Option<V> {
if let Some(mut store) = self.store.lock().ok() {
return store.remove(key).map(|(_, v)| v.value);
}
None
}

async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
let mut store_lock = self.store.lock().await;
let current_ts = SystemTime::now()
.duration_since(self.create_time)
.unwrap()
.as_millis();

// remove expired entries
while store_lock.len() > 0
&& store_lock
.get_first_key_value()
.map(|(_, ts, _)| ts.clone() < current_ts)
.unwrap_or(false)
{
store_lock.remove_first();
}
fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
if let Some(mut store_lock) = self.store.lock().ok() {
let current_ts = SystemTime::now()
.duration_since(self.create_time)
.unwrap()
.as_millis();

// remove the oldest item if the cache is still full
if store_lock.len() >= self.capacity && store_lock.get(&key).is_none() {
// remove the oldest item
let removal_key = store_lock.get_first_key_value().map(|(k, _, _)| k.clone());
if let Some(removal_key) = removal_key {
store_lock.remove(&removal_key);
// remove expired entries
while store_lock.len() > 0
&& store_lock
.get_first_key_value()
.map(|(_, ts, _)| ts.clone() < current_ts)
.unwrap_or(false)
{
store_lock.remove_first();
}
};

let exp_offset = custom_exp_offset.unwrap_or(self.expire_after);
store_lock
.insert(
key,
TTLCacheItem {
value: value,
expire_offset: exp_offset,
},
current_ts + exp_offset,
)
.map(|v| v.value)
// remove the oldest item if the cache is still full
if store_lock.len() >= self.capacity && store_lock.get(&key).is_none() {
// remove the oldest item
let removal_key = store_lock.get_first_key_value().map(|(k, _, _)| k.clone());
if let Some(removal_key) = removal_key {
store_lock.remove(&removal_key);
}
};

let exp_offset = custom_exp_offset.unwrap_or(self.expire_after);
return store_lock
.insert(
key,
TTLCacheItem {
value: value,
expire_offset: exp_offset,
},
current_ts + exp_offset,
)
.map(|v| v.value);
}
None
}
}

Expand All @@ -158,53 +168,31 @@ mod tests {
let caches = vec![cache, fs_cache];
block_on(async {
for cache in caches {
cache
.insert("key".to_string(), "value".to_string(), None)
.await;
assert_eq!(
cache.get(&"key".to_string()).await,
Some("value".to_string())
);
cache
.insert("key1".to_string(), "value1".to_string(), None)
.await;
cache
.insert("key2".to_string(), "value2".to_string(), None)
.await;
assert_eq!(cache.get(&"key".to_string()).await, None);
cache
.insert("key3".to_string(), "value3".to_string(), None)
.await;
cache.get(&"key2".to_string()).await;
cache
.insert("key4".to_string(), "value4".to_string(), None)
.await;
cache.insert("key".to_string(), "value".to_string(), None);
assert_eq!(cache.get(&"key".to_string()), Some("value".to_string()));
cache.insert("key1".to_string(), "value1".to_string(), None);
cache.insert("key2".to_string(), "value2".to_string(), None);
assert_eq!(cache.get(&"key".to_string()), None);
cache.insert("key3".to_string(), "value3".to_string(), None);
cache.get(&"key2".to_string());
cache.insert("key4".to_string(), "value4".to_string(), None);
// key2 should not be evicted because of LRU
assert_eq!(
cache.remove(&"key2".to_string()).await,
cache.remove(&"key2".to_string()),
Some("value2".to_string())
);
// key3 should be evicted because it was bumped to back after key2 was accessed
assert_eq!(cache.get(&"key3".to_string()).await, None);
cache
.insert("key5".to_string(), "value5".to_string(), None)
.await;
assert_eq!(cache.get(&"key3".to_string()), None);
cache.insert("key5".to_string(), "value5".to_string(), None);
thread::sleep(std::time::Duration::from_millis(6));
assert_eq!(cache.get(&"key5".to_string()).await, None);
assert_eq!(cache.get(&"key5".to_string()), None);
// test ttl config
cache
.insert("key6".to_string(), "value6".to_string(), Some(1))
.await;
cache
.insert("key7".to_string(), "value7".to_string(), None)
.await;
cache.insert("key6".to_string(), "value6".to_string(), Some(1));
cache.insert("key7".to_string(), "value7".to_string(), None);
// wait until value6 expires
thread::sleep(std::time::Duration::from_millis(1));
assert_eq!(cache.get(&"key6".to_string()).await, None);
assert_eq!(
cache.get(&"key7".to_string()).await,
Some("value7".to_string())
);
assert_eq!(cache.get(&"key6".to_string()), None);
assert_eq!(cache.get(&"key7".to_string()), Some("value7".to_string()));
}
std::fs::remove_dir_all(cache_location).unwrap();
});
Expand Down
6 changes: 2 additions & 4 deletions libindy_vdr/src/pool/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub async fn perform_ledger_request<T: Pool>(

if is_read_req {
if let Some(cache) = cache_opt.clone() {
if let Some((response, meta)) = cache.get(&cache_key).await {
if let Some((response, meta)) = cache.get(&cache_key) {
return Ok((RequestResult::Reply(response), meta));
}
}
Expand All @@ -240,9 +240,7 @@ pub async fn perform_ledger_request<T: Pool>(
}
}
if let Some(cache) = cache_opt {
cache
.insert(cache_key, (response.to_string(), meta.clone()), None)
.await;
cache.insert(cache_key, (response.to_string(), meta.clone()), None);
}
}
}
Expand Down
Loading