Skip to content

Commit

Permalink
add tests for sqlite store
Browse files Browse the repository at this point in the history
Signed-off-by: David Justice <[email protected]>
  • Loading branch information
devigned committed Oct 31, 2024
1 parent ba4f3e0 commit a0a132f
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 45 deletions.
57 changes: 42 additions & 15 deletions crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::Cas;
use anyhow::{Context, Result};
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_resource_table::Table;
use spin_world::v2::key_value;
use spin_world::wasi::keyvalue as wasi_keyvalue;
use std::{collections::HashSet, sync::Arc};
use tracing::{instrument, Level};
use super::Cas;

const DEFAULT_STORE_TABLE_CAPACITY: u32 = 256;

Expand All @@ -32,15 +32,14 @@ pub trait Store: Sync + Send {
async fn delete(&self, key: &str) -> Result<(), Error>;
async fn exists(&self, key: &str) -> Result<bool, Error>;
async fn get_keys(&self) -> Result<Vec<String>, Error>;
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<Option<(String, Vec<u8>)>>, Error>;
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
async fn new_compare_and_swap(&self, key: &str) -> Result<Arc<dyn Cas>, Error>;
async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
-> Result<Arc<dyn Cas>, Error>;
}



pub struct KeyValueDispatch {
allowed_stores: HashSet<String>,
manager: Arc<dyn StoreManager>,
Expand Down Expand Up @@ -282,7 +281,7 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
&mut self,
bucket: Resource<wasi_keyvalue::batch::Bucket>,
keys: Vec<String>,
) -> std::result::Result<Vec<Option<(String, Vec<u8>)>>, wasi_keyvalue::store::Error> {
) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store
.get_many(keys.iter().map(|k| k.to_string()).collect())
Expand Down Expand Up @@ -321,11 +320,20 @@ impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
bucket: Resource<wasi_keyvalue::atomics::Bucket>,
key: String,
) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
let bucket_rep = bucket.rep();
let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
let store = self.get_store_wasi(bucket)?;
let cas = store.new_compare_and_swap(&key).await.map_err(to_wasi_err)?;
let cas = store
.new_compare_and_swap(bucket_rep, &key)
.await
.map_err(to_wasi_err)?;
self.compare_and_swaps
.push(cas)
.map_err(|()| spin_world::wasi::keyvalue::store::Error::Other("too many compare_and_swaps opened".to_string()))
.map_err(|()| {
spin_world::wasi::keyvalue::store::Error::Other(
"too many compare_and_swaps opened".to_string(),
)
})
.map(Resource::new_own)
}

Expand Down Expand Up @@ -361,15 +369,32 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
#[instrument(name = "spin_key_value.swap", skip(self, cas_res, value), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn swap(
&mut self,
cas_res: Resource<wasi_keyvalue::atomics::Cas>,
cas_res: Resource<atomics::Cas>,
value: Vec<u8>,
) -> Result<std::result::Result<(), wasi_keyvalue::atomics::CasError>> {
) -> Result<std::result::Result<(), CasError>> {
let cas_rep = cas_res.rep();
let cas = self
.get_cas(cas_res)
.map_err(|e| wasi_keyvalue::atomics::CasError::StoreError(wasi_keyvalue::atomics::Error::Other(e.to_string())))?;
Ok(cas.swap(value)
.await
.map_err(|e| wasi_keyvalue::atomics::CasError::StoreError(wasi_keyvalue::atomics::Error::Other(e.to_string()))))
.get_cas(Resource::<Bucket>::new_own(cas_rep))
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;

match cas.swap(value).await {
Ok(cas) => Ok(Ok(())),
Err(err) => {
if err.to_string().contains("CAS_ERROR") {
let bucket = Resource::new_own(cas.bucket_rep().await);
let new_cas = self.new(bucket, cas.key().await).await?;
let new_cas_rep = new_cas.rep();
self.current(Resource::new_own(new_cas_rep)).await?;
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
new_cas_rep,
))))
} else {
Err(anyhow::Error::new(CasError::StoreError(
atomics::Error::Other(err.to_string()),
)))
}
}
}
}
}

Expand All @@ -379,6 +404,8 @@ pub fn log_error(err: impl std::fmt::Debug) -> Error {
}

use spin_world::v1::key_value::Error as LegacyError;
use spin_world::wasi::keyvalue::atomics;
use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};

fn to_legacy_error(value: key_value::Error) -> LegacyError {
match value {
Expand Down
2 changes: 2 additions & 0 deletions crates/factor-key-value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ impl AppState {
pub trait Cas: Sync + Send {
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), Error>;
async fn bucket_rep(&self) -> u32;
async fn key(&self) -> String;
}

pub struct InstanceBuilder {
Expand Down
34 changes: 32 additions & 2 deletions crates/factor-key-value/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,33 @@ impl Store for CachingStore {
.collect())
}

async fn get_many(&self, keys: Vec<String>) -> anyhow::Result<Vec<Option<(String, Vec<u8>)>>, Error> {
async fn get_many(
&self,
keys: Vec<String>,
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
// // Retrieve the specified value from the cache, lazily populating the cache as necessary.
// let mut state = self.state.lock().await;
//
// let mut keys_and_values: Vec<Option<(String, Vec<u8>)>> = Vec::new();
// let mut keys_not_found: Vec<String> = Vec::new();
// for key in keys {
// match state.cache.get(key.as_str()).cloned() {
// Some(value) => keys_and_values.push(Some((key, value))),
// None => keys_not_found.push(key),
// }
// }
//
// // guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
// // cache prior to their corresponding writes reaching the backing store.
// state.flush().await?;
//
// let value = self.inner.get(key).await?;
//
// state.cache.put(key.to_owned(), value.clone());
//
// Ok(value)
//

todo!()
}

Expand All @@ -254,7 +280,11 @@ impl Store for CachingStore {
todo!()
}

async fn new_compare_and_swap(&self, key: &str) -> anyhow::Result<Arc<dyn Cas>, Error> {
async fn new_compare_and_swap(
&self,
bucket_rep: u32,
key: &str,
) -> anyhow::Result<Arc<dyn Cas>, Error> {
todo!()
}
}
8 changes: 6 additions & 2 deletions crates/key-value-azure/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl Store for AzureCosmosStore {
self.get_keys().await
}

async fn get_many(&self, keys: Vec<String>) -> Result<Vec<Option<(String, Vec<u8>)>>, Error> {
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error> {
todo!()
}

Expand All @@ -168,7 +168,11 @@ impl Store for AzureCosmosStore {
todo!()
}

async fn new_compare_and_swap(&self, key: &str) -> Result<Arc<dyn spin_factor_key_value::Cas>, Error> {
async fn new_compare_and_swap(
&self,
bucket_rep: u32,
key: &str,
) -> Result<Arc<dyn spin_factor_key_value::Cas>, Error> {
todo!()
}
}
Expand Down
8 changes: 6 additions & 2 deletions crates/key-value-redis/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Store for RedisStore {
.map_err(log_error)
}

async fn get_many(&self, keys: Vec<String>) -> Result<Vec<Option<(String, Vec<u8>)>>, Error> {
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error> {
todo!()
}

Expand All @@ -115,7 +115,11 @@ impl Store for RedisStore {
todo!()
}

async fn new_compare_and_swap(&self, key: &str) -> Result<Arc<dyn spin_factor_key_value::Cas>, Error> {
async fn new_compare_and_swap(
&self,
bucket_rep: u32,
key: &str,
) -> Result<Arc<dyn spin_factor_key_value::Cas>, Error> {
todo!()
}
}
Loading

0 comments on commit a0a132f

Please sign in to comment.