Skip to content

Commit

Permalink
Merge pull request #2833 from calebschoepp/unify-sqlite
Browse files Browse the repository at this point in the history
  • Loading branch information
michelleN authored Sep 16, 2024
2 parents 4fbf872 + f5c911b commit 49694dd
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 97 deletions.
13 changes: 7 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/factor-sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ spin-world = { path = "../world" }
table = { path = "../table" }
tokio = "1"
toml = "0.8"
tracing = { workspace = true }

[dev-dependencies]
spin-factors-test = { path = "../factors-test" }
Expand Down
12 changes: 12 additions & 0 deletions crates/factor-sqlite/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use spin_factors::wasmtime::component::Resource;
use spin_factors::{anyhow, SelfInstanceBuilder};
use spin_world::v1::sqlite as v1;
use spin_world::v2::sqlite as v2;
use tracing::field::Empty;
use tracing::{instrument, Level};

use crate::{Connection, ConnectionCreator};

Expand Down Expand Up @@ -62,6 +64,7 @@ impl v2::Host for InstanceState {

#[async_trait]
impl v2::HostConnection for InstanceState {
#[instrument(name = "spin_sqlite.open", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite", sqlite.backend = Empty))]
async fn open(&mut self, database: String) -> Result<Resource<v2::Connection>, v2::Error> {
if !self.allowed_databases.contains(&database) {
return Err(v2::Error::AccessDenied);
Expand All @@ -70,12 +73,17 @@ impl v2::HostConnection for InstanceState {
.ok_or(v2::Error::NoSuchDatabase)?
.create_connection(&database)
.await?;
tracing::Span::current().record(
"sqlite.backend",
conn.summary().as_deref().unwrap_or("unknown"),
);
self.connections
.push(conn)
.map_err(|()| v2::Error::Io("too many connections opened".to_string()))
.map(Resource::new_own)
}

#[instrument(name = "spin_sqlite.execute", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite", otel.name = query, sqlite.backend = Empty))]
async fn execute(
&mut self,
connection: Resource<v2::Connection>,
Expand All @@ -86,6 +94,10 @@ impl v2::HostConnection for InstanceState {
Ok(c) => c,
Err(err) => return Err(err),
};
tracing::Span::current().record(
"sqlite.backend",
conn.summary().as_deref().unwrap_or("unknown"),
);
conn.query(&query, parameters).await
}

Expand Down
1 change: 0 additions & 1 deletion crates/sqlite-inproc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ rusqlite = { version = "0.29.0", features = ["bundled"] }
spin-factor-sqlite = { path = "../factor-sqlite" }
spin-world = { path = "../world" }
tokio = "1"
tracing = { workspace = true }

[lints]
workspace = true
32 changes: 10 additions & 22 deletions crates/sqlite-inproc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ use async_trait::async_trait;
use once_cell::sync::OnceCell;
use spin_factor_sqlite::Connection;
use spin_world::v2::sqlite;
use tracing::{instrument, Level};

/// The location of an in-process sqlite database.
#[derive(Debug, Clone)]
pub enum InProcDatabaseLocation {
/// An in-memory sqlite database.
InMemory,
/// The path to the sqlite database.
Path(PathBuf),
}

impl InProcDatabaseLocation {
/// Convert an optional path to a database location.
///
/// Ensures that the parent directory of the database exists.
/// Ensures that the parent directory of the database exists. If path is None, then an in memory
/// database will be used.
pub fn from_path(path: Option<PathBuf>) -> anyhow::Result<Self> {
match path {
Some(path) => {
Expand Down Expand Up @@ -68,9 +71,9 @@ impl InProcConnection {
}
}

impl InProcConnection {
#[instrument(name = "spin_sqlite_inproc.query", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite", otel.name = query))]
pub async fn query(
#[async_trait]
impl Connection for InProcConnection {
async fn query(
&self,
query: &str,
parameters: Vec<sqlite::Value>,
Expand All @@ -84,8 +87,7 @@ impl InProcConnection {
.map_err(|e| sqlite::Error::Io(e.to_string()))?
}

#[instrument(name = "spin_sqlite_inproc.execute_batch", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite", db.statements = statements))]
pub async fn execute_batch(&self, statements: &str) -> anyhow::Result<()> {
async fn execute_batch(&self, statements: &str) -> anyhow::Result<()> {
let connection = self.db_connection()?;
let statements = statements.to_owned();
tokio::task::spawn_blocking(move || {
Expand All @@ -97,21 +99,6 @@ impl InProcConnection {
.context("failed to spawn blocking task")?;
Ok(())
}
}

#[async_trait]
impl Connection for InProcConnection {
async fn query(
&self,
query: &str,
parameters: Vec<sqlite::Value>,
) -> Result<sqlite::QueryResult, sqlite::Error> {
self.query(query, parameters).await
}

async fn execute_batch(&self, statements: &str) -> anyhow::Result<()> {
self.execute_batch(statements).await
}

fn summary(&self) -> Option<String> {
Some(match &self.location {
Expand All @@ -121,6 +108,7 @@ impl Connection for InProcConnection {
}
}

// This function lives outside the query function to make it more readable.
fn execute_query(
connection: &Mutex<rusqlite::Connection>,
query: &str,
Expand Down
2 changes: 1 addition & 1 deletion crates/sqlite-libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ async-trait = "0.1.68"
# libsqlite3-sys as used by spin-sqlite-inproc.
libsql = { version = "0.3.2", features = ["remote"], default-features = false }
rusqlite = { version = "0.29.0", features = ["bundled"] }
spin-factor-sqlite = { path = "../factor-sqlite" }
spin-world = { path = "../world" }
sqlparser = "0.34"
tokio = { version = "1", features = ["full"] }
tracing = { workspace = true }

[lints]
workspace = true
68 changes: 61 additions & 7 deletions crates/sqlite-libsql/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,77 @@
use anyhow::Context;
use async_trait::async_trait;
use spin_factor_sqlite::Connection;
use spin_world::v2::sqlite as v2;
use spin_world::v2::sqlite::{self, RowResult};
use tracing::{instrument, Level};
use tokio::sync::OnceCell;

/// A lazy wrapper around a [`LibSqlConnection`] that implements the [`Connection`] trait.
pub struct LazyLibSqlConnection {
url: String,
token: String,
// Since the libSQL client can only be created asynchronously, we wait until
// we're in the `Connection` implementation to create. Since we only want to do
// this once, we use a `OnceCell` to store it.
inner: OnceCell<LibSqlConnection>,
}

impl LazyLibSqlConnection {
pub fn new(url: String, token: String) -> Self {
Self {
url,
token,
inner: OnceCell::new(),
}
}

pub async fn get_or_create_connection(&self) -> Result<&LibSqlConnection, v2::Error> {
self.inner
.get_or_try_init(|| async {
LibSqlConnection::create(self.url.clone(), self.token.clone())
.await
.context("failed to create SQLite client")
})
.await
.map_err(|_| v2::Error::InvalidConnection)
}
}

#[async_trait]
impl Connection for LazyLibSqlConnection {
async fn query(
&self,
query: &str,
parameters: Vec<v2::Value>,
) -> Result<v2::QueryResult, v2::Error> {
let client = self.get_or_create_connection().await?;
client.query(query, parameters).await
}

async fn execute_batch(&self, statements: &str) -> anyhow::Result<()> {
let client = self.get_or_create_connection().await?;
client.execute_batch(statements).await
}

fn summary(&self) -> Option<String> {
Some(format!("libSQL at {}", self.url))
}
}

/// An open connection to a libSQL server.
#[derive(Clone)]
pub struct LibsqlClient {
pub struct LibSqlConnection {
inner: libsql::Connection,
}

impl LibsqlClient {
#[instrument(name = "spin_sqlite_libsql.create_connection", skip(token), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite"))]
impl LibSqlConnection {
pub async fn create(url: String, token: String) -> anyhow::Result<Self> {
let db = libsql::Builder::new_remote(url, token).build().await?;
let inner = db.connect()?;
Ok(Self { inner })
}
}

impl LibsqlClient {
#[instrument(name = "spin_sqlite_libsql.query", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite", otel.name = query))]
impl LibSqlConnection {
pub async fn query(
&self,
query: &str,
Expand All @@ -36,7 +91,6 @@ impl LibsqlClient {
})
}

#[instrument(name = "spin_sqlite_libsql.execute_batch", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite", db.statements = statements))]
pub async fn execute_batch(&self, statements: &str) -> anyhow::Result<()> {
self.inner.execute_batch(statements).await?;

Expand Down
Loading

0 comments on commit 49694dd

Please sign in to comment.