Skip to content

Commit

Permalink
Merge pull request #2895 from devigned/wasi-kv
Browse files Browse the repository at this point in the history
WASI Key Value 0.2.0-draft2 support
  • Loading branch information
vdice authored Nov 4, 2024
2 parents c021968 + 50f22a1 commit d833301
Show file tree
Hide file tree
Showing 23 changed files with 1,436 additions and 17 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/factor-key-value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ spin-world = { path = "../world" }
tokio = { workspace = true, features = ["macros", "sync", "rt"] }
toml = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
spin-factors-test = { path = "../factors-test" }
Expand Down
265 changes: 264 additions & 1 deletion crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::{Cas, SwapError};
use anyhow::{Context, Result};
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_resource_table::Table;
use spin_world::v2::key_value;
use spin_world::wasi::keyvalue as wasi_keyvalue;
use std::{collections::HashSet, sync::Arc};
use tracing::{instrument, Level};

Expand Down Expand Up @@ -30,12 +32,19 @@ pub trait Store: Sync + Send {
async fn delete(&self, key: &str) -> Result<(), Error>;
async fn exists(&self, key: &str) -> Result<bool, Error>;
async fn get_keys(&self) -> Result<Vec<String>, Error>;
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
-> Result<Arc<dyn Cas>, Error>;
}

pub struct KeyValueDispatch {
allowed_stores: HashSet<String>,
manager: Arc<dyn StoreManager>,
stores: Table<Arc<dyn Store>>,
compare_and_swaps: Table<Arc<dyn Cas>>,
}

impl KeyValueDispatch {
Expand All @@ -52,16 +61,43 @@ impl KeyValueDispatch {
allowed_stores,
manager,
stores: Table::new(capacity),
compare_and_swaps: Table::new(capacity),
}
}

pub fn get_store(&self, store: Resource<key_value::Store>) -> anyhow::Result<&Arc<dyn Store>> {
pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
self.stores.get(store.rep()).context("invalid store")
}

pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
self.compare_and_swaps
.get(cas.rep())
.context("invalid compare and swap")
}

pub fn allowed_stores(&self) -> &HashSet<String> {
&self.allowed_stores
}

pub fn get_store_wasi<T: 'static>(
&self,
store: Resource<T>,
) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
self.stores
.get(store.rep())
.ok_or(wasi_keyvalue::store::Error::NoSuchStore)
}

pub fn get_cas_wasi<T: 'static>(
&self,
cas: Resource<T>,
) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
self.compare_and_swaps
.get(cas.rep())
.ok_or(wasi_keyvalue::atomics::Error::Other(
"compare and swap not found".to_string(),
))
}
}

#[async_trait]
Expand Down Expand Up @@ -141,12 +177,239 @@ impl key_value::HostStore for KeyValueDispatch {
}
}

fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
match e {
Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
}
}

#[async_trait]
impl wasi_keyvalue::store::Host for KeyValueDispatch {
async fn open(
&mut self,
identifier: String,
) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
if self.allowed_stores.contains(&identifier) {
let store = self
.stores
.push(self.manager.get(&identifier).await.map_err(to_wasi_err)?)
.map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
Ok(Resource::new_own(store))
} else {
Err(wasi_keyvalue::store::Error::AccessDenied)
}
}

fn convert_error(
&mut self,
error: spin_world::wasi::keyvalue::store::Error,
) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
Ok(error)
}
}

use wasi_keyvalue::store::Bucket;
#[async_trait]
impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
async fn get(
&mut self,
self_: Resource<Bucket>,
key: String,
) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(self_)?;
store.get(&key).await.map_err(to_wasi_err)
}

async fn set(
&mut self,
self_: Resource<Bucket>,
key: String,
value: Vec<u8>,
) -> Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(self_)?;
store.set(&key, &value).await.map_err(to_wasi_err)
}

async fn delete(
&mut self,
self_: Resource<Bucket>,
key: String,
) -> Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(self_)?;
store.delete(&key).await.map_err(to_wasi_err)
}

async fn exists(
&mut self,
self_: Resource<Bucket>,
key: String,
) -> Result<bool, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(self_)?;
store.exists(&key).await.map_err(to_wasi_err)
}

async fn list_keys(
&mut self,
self_: Resource<Bucket>,
cursor: Option<String>,
) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
match cursor {
Some(_) => Err(wasi_keyvalue::store::Error::Other(
"list_keys: cursor not supported".to_owned(),
)),
None => {
let store = self.get_store_wasi(self_)?;
let keys = store.get_keys().await.map_err(to_wasi_err)?;
Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
}
}
}

async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
self.stores.remove(rep.rep());
Ok(())
}
}

#[async_trait]
impl wasi_keyvalue::batch::Host for KeyValueDispatch {
#[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get_many(
&mut self,
bucket: Resource<wasi_keyvalue::batch::Bucket>,
keys: Vec<String>,
) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store
.get_many(keys.iter().map(|k| k.to_string()).collect())
.await
.map_err(to_wasi_err)
}

#[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn set_many(
&mut self,
bucket: Resource<wasi_keyvalue::batch::Bucket>,
key_values: Vec<(String, Vec<u8>)>,
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store.set_many(key_values).await.map_err(to_wasi_err)
}

#[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn delete_many(
&mut self,
bucket: Resource<wasi_keyvalue::batch::Bucket>,
keys: Vec<String>,
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store
.delete_many(keys.iter().map(|k| k.to_string()).collect())
.await
.map_err(to_wasi_err)
}
}

#[async_trait]
impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
async fn new(
&mut self,
bucket: Resource<wasi_keyvalue::atomics::Bucket>,
key: String,
) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
let bucket_rep = bucket.rep();
let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
let store = self.get_store_wasi(bucket)?;
let cas = store
.new_compare_and_swap(bucket_rep, &key)
.await
.map_err(to_wasi_err)?;
self.compare_and_swaps
.push(cas)
.map_err(|()| {
spin_world::wasi::keyvalue::store::Error::Other(
"too many compare_and_swaps opened".to_string(),
)
})
.map(Resource::new_own)
}

async fn current(
&mut self,
cas: Resource<wasi_keyvalue::atomics::Cas>,
) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
let cas = self
.get_cas(cas)
.map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
cas.current().await.map_err(to_wasi_err)
}

async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
self.compare_and_swaps.remove(rep.rep());
Ok(())
}
}

#[async_trait]
impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
#[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn increment(
&mut self,
bucket: Resource<wasi_keyvalue::atomics::Bucket>,
key: String,
delta: i64,
) -> Result<i64, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store.increment(key, delta).await.map_err(to_wasi_err)
}

#[instrument(name = "spin_key_value.swap", skip(self, cas_res, value), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn swap(
&mut self,
cas_res: Resource<atomics::Cas>,
value: Vec<u8>,
) -> Result<std::result::Result<(), CasError>> {
let cas_rep = cas_res.rep();
let cas = self
.get_cas(Resource::<Bucket>::new_own(cas_rep))
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;

match cas.swap(value).await {
Ok(_) => Ok(Ok(())),
Err(err) => match err {
SwapError::CasFailed(_) => {
let bucket = Resource::new_own(cas.bucket_rep().await);
let new_cas = self.new(bucket, cas.key().await).await?;
let new_cas_rep = new_cas.rep();
self.current(Resource::new_own(new_cas_rep)).await?;
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
new_cas_rep,
))))
}
SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError(
atomics::Error::Other(msg),
))),
},
}
}
}

pub fn log_error(err: impl std::fmt::Debug) -> Error {
tracing::warn!("key-value error: {err:?}");
Error::Other(format!("{err:?}"))
}

pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
tracing::warn!("key-value error: {err:?}");
SwapError::Other(format!("{err:?}"))
}

use spin_world::v1::key_value::Error as LegacyError;
use spin_world::wasi::keyvalue::atomics;
use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};

fn to_legacy_error(value: key_value::Error) -> LegacyError {
match value {
Expand Down
37 changes: 36 additions & 1 deletion crates/factor-key-value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use spin_locked_app::MetadataKey;

/// Metadata key for key-value stores.
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
pub use host::{log_error, Error, KeyValueDispatch, Store, StoreManager};
pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
pub use runtime_config::RuntimeConfig;
use spin_core::async_trait;
pub use util::{CachingStoreManager, DelegatingStoreManager};

/// A factor that provides key-value storage.
Expand All @@ -40,6 +41,9 @@ impl Factor for KeyValueFactor {
fn init<T: Send + 'static>(&mut self, mut ctx: InitContext<T, Self>) -> anyhow::Result<()> {
ctx.link_bindings(spin_world::v1::key_value::add_to_linker)?;
ctx.link_bindings(spin_world::v2::key_value::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::atomics::add_to_linker)?;
Ok(())
}

Expand Down Expand Up @@ -131,6 +135,37 @@ impl AppState {
}
}

/// `SwapError` are errors that occur during compare and swap operations
#[derive(Debug, thiserror::Error)]
pub enum SwapError {
#[error("{0}")]
CasFailed(String),

#[error("{0}")]
Other(String),
}

/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill.
///
/// `current` is expected to get the current value for the key associated with the CAS operation
/// while also starting what is needed to ensure the value to be replaced will not have mutated
/// between the time of calling `current` and `swap`. For example, a get from a backend store
/// may provide the caller with an etag (a version stamp), which can be used with an if-match
/// header to ensure the version updated is the version that was read (optimistic concurrency).
/// Rather than an etag, one could start a transaction, if supported by the backing store, which
/// would provide atomicity.
///
/// `swap` is expected to replace the old value with the new value respecting the atomicity of the
/// operation. If there was no key / value with the given key in the store, the `swap` operation
/// should **insert** the key and value, disallowing an update.
#[async_trait]
pub trait Cas: Sync + Send {
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
async fn bucket_rep(&self) -> u32;
async fn key(&self) -> String;
}

pub struct InstanceBuilder {
/// The store manager for the app.
///
Expand Down
Loading

0 comments on commit d833301

Please sign in to comment.