Skip to content

Commit

Permalink
Merge pull request #2883 from ogghead/dynamo-key-value-store
Browse files Browse the repository at this point in the history
Implement AWS key value store
  • Loading branch information
itowlson authored Nov 18, 2024
2 parents 509d8d1 + 6d8570b commit 606c48a
Show file tree
Hide file tree
Showing 13 changed files with 1,216 additions and 63 deletions.
468 changes: 463 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

48 changes: 30 additions & 18 deletions crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
keys: Vec<String>,
) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store
.get_many(keys.iter().map(|k| k.to_string()).collect())
.await
.map_err(to_wasi_err)
if keys.is_empty() {
return Ok(vec![]);
}
store.get_many(keys).await.map_err(to_wasi_err)
}

#[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))]
Expand All @@ -296,6 +296,9 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
key_values: Vec<(String, Vec<u8>)>,
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
if key_values.is_empty() {
return Ok(());
}
store.set_many(key_values).await.map_err(to_wasi_err)
}

Expand All @@ -306,10 +309,10 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
keys: Vec<String>,
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store
.delete_many(keys.iter().map(|k| k.to_string()).collect())
.await
.map_err(to_wasi_err)
if keys.is_empty() {
return Ok(());
}
store.delete_many(keys).await.map_err(to_wasi_err)
}
}

Expand Down Expand Up @@ -355,6 +358,13 @@ impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {

#[async_trait]
impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
fn convert_cas_error(
&mut self,
error: spin_world::wasi::keyvalue::atomics::CasError,
) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
Ok(error)
}

#[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn increment(
&mut self,
Expand All @@ -371,27 +381,29 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
&mut self,
cas_res: Resource<atomics::Cas>,
value: Vec<u8>,
) -> Result<std::result::Result<(), CasError>> {
) -> Result<(), CasError> {
let cas_rep = cas_res.rep();
let cas = self
.get_cas(Resource::<Bucket>::new_own(cas_rep))
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;

match cas.swap(value).await {
Ok(_) => Ok(Ok(())),
Ok(_) => Ok(()),
Err(err) => match err {
SwapError::CasFailed(_) => {
let bucket = Resource::new_own(cas.bucket_rep().await);
let new_cas = self.new(bucket, cas.key().await).await?;
let new_cas = self
.new(bucket, cas.key().await)
.await
.map_err(CasError::StoreError)?;
let new_cas_rep = new_cas.rep();
self.current(Resource::new_own(new_cas_rep)).await?;
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
new_cas_rep,
))))
self.current(Resource::new_own(new_cas_rep))
.await
.map_err(CasError::StoreError)?;
let res = Resource::new_own(new_cas_rep);
Err(CasError::CasFailed(res))
}
SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError(
atomics::Error::Other(msg),
))),
SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
},
}
}
Expand Down
15 changes: 11 additions & 4 deletions crates/factor-key-value/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ impl Store for CachingStore {
keys: Vec<String>,
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
let mut state = self.state.lock().await;

// Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior
// to their corresponding writes reaching the backing store.
state.flush().await?;

let mut found: Vec<(String, Option<Vec<u8>>)> = Vec::new();
let mut not_found: Vec<String> = Vec::new();
for key in keys {
Expand All @@ -260,10 +265,12 @@ impl Store for CachingStore {
}
}

let keys_and_values = self.inner.get_many(not_found).await?;
for (key, value) in keys_and_values {
found.push((key.clone(), value.clone()));
state.cache.put(key, value);
if !not_found.is_empty() {
let keys_and_values = self.inner.get_many(not_found).await?;
for (key, value) in keys_and_values {
found.push((key.clone(), value.clone()));
state.cache.put(key, value);
}
}

Ok(found)
Expand Down
22 changes: 22 additions & 0 deletions crates/key-value-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "spin-key-value-aws"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
rust-version.workspace = true

[dependencies]
anyhow = { workspace = true }
async-once-cell = "0.5.4"
aws-config = "1.1.7"
aws-credential-types = "1.1.7"
aws-sdk-dynamodb = "1.49.0"
serde = { workspace = true }
spin-core = { path = "../core" }
spin-factor-key-value = { path = "../factor-key-value" }

[lints]
workspace = true
74 changes: 74 additions & 0 deletions crates/key-value-aws/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
mod store;

use serde::Deserialize;
use spin_factor_key_value::runtime_config::spin::MakeKeyValueStore;
use store::{
KeyValueAwsDynamo, KeyValueAwsDynamoAuthOptions, KeyValueAwsDynamoRuntimeConfigOptions,
};

/// A key-value store that uses AWS Dynamo as the backend.
#[derive(Default)]
pub struct AwsDynamoKeyValueStore {
_priv: (),
}

impl AwsDynamoKeyValueStore {
/// Creates a new `AwsKeyValueStore`.
pub fn new() -> Self {
Self::default()
}
}

/// Runtime configuration for the AWS Dynamo key-value store.
#[derive(Deserialize)]
pub struct AwsDynamoKeyValueRuntimeConfig {
/// The access key for the AWS Dynamo DB account role.
access_key: Option<String>,
/// The secret key for authorization on the AWS Dynamo DB account.
secret_key: Option<String>,
/// The token for authorization on the AWS Dynamo DB account.
token: Option<String>,
/// The AWS region where the database is located
region: String,
/// Boolean determining whether to use strongly consistent reads.
/// Defaults to `false` but can be set to `true` to improve atomicity
consistent_read: Option<bool>,
/// The AWS Dynamo DB table.
table: String,
}

impl MakeKeyValueStore for AwsDynamoKeyValueStore {
const RUNTIME_CONFIG_TYPE: &'static str = "aws_dynamo";

type RuntimeConfig = AwsDynamoKeyValueRuntimeConfig;

type StoreManager = KeyValueAwsDynamo;

fn make_store(
&self,
runtime_config: Self::RuntimeConfig,
) -> anyhow::Result<Self::StoreManager> {
let AwsDynamoKeyValueRuntimeConfig {
access_key,
secret_key,
token,
region,
consistent_read,
table,
} = runtime_config;
let auth_options = match (access_key, secret_key) {
(Some(access_key), Some(secret_key)) => {
KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(
KeyValueAwsDynamoRuntimeConfigOptions::new(access_key, secret_key, token),
)
}
_ => KeyValueAwsDynamoAuthOptions::Environmental,
};
KeyValueAwsDynamo::new(
region,
consistent_read.unwrap_or(false),
table,
auth_options,
)
}
}
Loading

0 comments on commit 606c48a

Please sign in to comment.