diff --git a/Cargo.lock b/Cargo.lock index a969ac0780..4feb5d6292 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -304,6 +304,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -614,7 +620,7 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "http-body 1.0.0", @@ -862,7 +868,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", "syn 2.0.70", "which", @@ -885,7 +891,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", "syn 2.0.70", "which", @@ -1022,9 +1028,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" dependencies = [ "serde", ] @@ -2122,6 +2128,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -2396,6 +2417,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.1" @@ -2630,7 +2670,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -2653,6 +2693,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.0", "httparse", @@ -2661,6 +2702,7 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", + "want", ] [[package]] @@ -2712,6 +2754,24 @@ dependencies = [ "webpki-roots 0.26.3", ] +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.4.1", + "hyper-util", + "rustls 0.23.11", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", + "webpki-roots 0.26.3", +] + [[package]] name = "hyper-timeout" version = "0.4.1" @@ -2724,6 +2784,22 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-tungstenite" version = "0.11.1" @@ -2737,6 +2813,25 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "hyper-util" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.4.1", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -3097,6 +3192,7 @@ dependencies = [ "parking_lot", "pprof", "rand", + "reqwest 0.12.9", "serde", "serde_json", "tempfile", @@ -3126,7 +3222,7 @@ dependencies = [ "async-trait", "base64 0.21.7", "num-traits", - "reqwest", + "reqwest 0.11.27", "serde_json", "url", ] @@ -3246,7 +3342,7 @@ dependencies = [ "prost-build", "rand", "regex", - "reqwest", + "reqwest 0.11.27", "rheaper", "ring", "rustls 0.21.12", @@ -3728,6 +3824,23 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "newline-converter" version = "0.3.0" @@ -3909,12 +4022,50 @@ version = "11.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" +[[package]] +name = "openssl" +version = "0.10.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.70", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "3.9.2" @@ -4392,6 +4543,54 @@ dependencies = [ "serde", ] +[[package]] +name = "quinn" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash 2.0.0", + "rustls 0.23.11", + "socket2", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +dependencies = [ + "bytes", + "rand", + "ring", + "rustc-hash 2.0.0", + "rustls 0.23.11", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285" +dependencies = [ + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -4550,7 +4749,7 @@ checksum = "d4a52e724646c6c0800fc456ec43b4165d2f91fba88ceaca06d9e0b400023478" dependencies = [ "hashbrown 0.13.2", "log", - "rustc-hash", + "rustc-hash 1.1.0", "slice-group-by", "smallvec", ] @@ -4616,7 +4815,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", @@ -4634,7 +4833,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 0.1.2", - "system-configuration", + "system-configuration 0.5.1", "tokio", "tokio-rustls 0.24.1", "tower-service", @@ -4646,6 +4845,54 @@ dependencies = [ "winreg", ] +[[package]] +name = "reqwest" +version = "0.12.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.1", + "hyper-rustls 0.27.3", + "hyper-tls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls 0.23.11", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "system-configuration 0.6.1", + "tokio", + "tokio-native-tls", + "tokio-rustls 0.26.0", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 0.26.3", + "windows-registry", +] + [[package]] name = "rfc6979" version = "0.3.1" @@ -4720,6 +4967,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "rustc_version" version = "0.4.0" @@ -4784,6 +5037,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls" +version = "0.23.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4828ea528154ae444e5a642dbb7d5623354030dc9822b83fd9bb79683c7399d0" +dependencies = [ + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.5", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -5531,6 +5798,9 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] [[package]] name = "system-configuration" @@ -5540,7 +5810,18 @@ checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", "core-foundation", - "system-configuration-sys", + "system-configuration-sys 0.5.0", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "system-configuration-sys 0.6.0", ] [[package]] @@ -5553,6 +5834,16 @@ dependencies = [ "libc", ] +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "system-interface" version = "0.25.9" @@ -5758,6 +6049,16 @@ dependencies = [ "syn 2.0.70", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -5779,6 +6080,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.11", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -5849,7 +6161,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", @@ -5879,7 +6191,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", @@ -6277,6 +6589,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vergen" version = "8.3.2" @@ -6892,6 +7210,36 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/libsql/Cargo.toml b/libsql/Cargo.toml index 8788db2cd6..d6ff39e475 100644 --- a/libsql/Cargo.toml +++ b/libsql/Cargo.toml @@ -42,6 +42,7 @@ fallible-iterator = { version = "0.3", optional = true } libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true } async-stream = { version = "0.3.5", optional = true } +reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls" ] } [dev-dependencies] criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] } diff --git a/libsql/examples/offline_writes.rs b/libsql/examples/offline_writes.rs new file mode 100644 index 0000000000..8135fe3426 --- /dev/null +++ b/libsql/examples/offline_writes.rs @@ -0,0 +1,85 @@ +// Example of using a offline writes with libSQL. + +use libsql::{params, Builder}; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + + // The local database path where the data will be stored. + let db_path = std::env::var("LIBSQL_DB_PATH") + .map_err(|_| { + eprintln!( + "Please set the LIBSQL_DB_PATH environment variable to set to local database path." + ) + }) + .unwrap(); + + // The remote sync URL to use. + let sync_url = std::env::var("LIBSQL_SYNC_URL") + .map_err(|_| { + eprintln!( + "Please set the LIBSQL_SYNC_URL environment variable to set to remote sync URL." + ) + }) + .unwrap(); + + let namespace = std::env::var("LIBSQL_NAMESPACE").ok(); + + // The authentication token to use. + let auth_token = std::env::var("LIBSQL_AUTH_TOKEN").unwrap_or("".to_string()); + + let db_builder = if let Some(ns) = namespace { + Builder::new_synced_database(db_path, sync_url, auth_token).namespace(&ns) + } else { + Builder::new_synced_database(db_path, sync_url, auth_token) + }; + + let db = match db_builder.build().await { + Ok(db) => db, + Err(error) => { + eprintln!("Error connecting to remote sync server: {}", error); + return; + } + }; + + let conn = db.connect().unwrap(); + + conn.execute( + r#" + CREATE TABLE IF NOT EXISTS guest_book_entries ( + text TEXT + )"#, + (), + ) + .await + .unwrap(); + + let mut input = String::new(); + println!("Please write your entry to the guestbook:"); + match std::io::stdin().read_line(&mut input) { + Ok(_) => { + println!("You entered: {}", input); + let params = params![input.as_str()]; + conn.execute("INSERT INTO guest_book_entries (text) VALUES (?)", params) + .await + .unwrap(); + } + Err(error) => { + eprintln!("Error reading input: {}", error); + } + } + let mut results = conn + .query("SELECT * FROM guest_book_entries", ()) + .await + .unwrap(); + println!("Guest book entries:"); + while let Some(row) = results.next().await.unwrap() { + let text: String = row.get(0).unwrap(); + println!(" {}", text); + } + + print!("Syncing database to remote..."); + db.sync().await.unwrap(); + println!(" done"); +} diff --git a/libsql/src/database.rs b/libsql/src/database.rs index a47c20da2d..c88d002a19 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -47,6 +47,8 @@ enum DbType { db: crate::local::Database, encryption_config: Option, }, + #[cfg(feature = "replication")] + Offline { db: crate::local::Database }, #[cfg(feature = "remote")] Remote { url: String, @@ -66,6 +68,8 @@ impl fmt::Debug for DbType { Self::File { .. } => write!(f, "File"), #[cfg(feature = "replication")] Self::Sync { .. } => write!(f, "Sync"), + #[cfg(feature = "replication")] + Self::Offline { .. } => write!(f, "Offline"), #[cfg(feature = "remote")] Self::Remote { .. } => write!(f, "Remote"), _ => write!(f, "no database type set"), @@ -333,10 +337,10 @@ cfg_replication! { /// Sync database from remote, and returns the committed frame_no after syncing, if /// applicable. pub async fn sync(&self) -> Result { - if let DbType::Sync { db, encryption_config: _ } = &self.db_type { - db.sync().await - } else { - Err(Error::SyncNotSupported(format!("{:?}", self.db_type))) + match &self.db_type { + DbType::Sync { db, encryption_config: _ } => db.sync().await, + DbType::Offline { db } => db.push().await, + _ => Err(Error::SyncNotSupported(format!("{:?}", self.db_type))), } } @@ -595,6 +599,17 @@ impl Database { Ok(Connection { conn }) } + #[cfg(feature = "replication")] + DbType::Offline { db } => { + use crate::local::impls::LibsqlConnection; + + let conn = db.connect()?; + + let conn = std::sync::Arc::new(LibsqlConnection { conn }); + + Ok(Connection { conn }) + } + #[cfg(feature = "remote")] DbType::Remote { url, diff --git a/libsql/src/database/builder.rs b/libsql/src/database/builder.rs index 35cd93f899..9c8eed5f85 100644 --- a/libsql/src/database/builder.rs +++ b/libsql/src/database/builder.rs @@ -12,6 +12,8 @@ use super::DbType; /// it does no networking and does not connect to any remote database. /// - `new_remote_replica`/`RemoteReplica` creates an embedded replica database that will be able /// to sync from the remote url and delegate writes to the remote primary. +/// - `new_synced_database`/`SyncedDatabase` creates an embedded replica database that supports +/// offline writes. /// - `new_local_replica`/`LocalReplica` creates an embedded replica similar to the remote version /// except you must use `Database::sync_frames` to sync with the remote. This version also /// includes the ability to delegate writes to a remote primary. @@ -66,6 +68,30 @@ impl Builder<()> { } } + cfg_replication! { + /// Create a new offline embedded replica. + pub fn new_synced_database( + path: impl AsRef, + url: String, + auth_token: String, + ) -> Builder { + Builder { + inner: SyncedDatabase { + path: path.as_ref().to_path_buf(), + flags: crate::OpenFlags::default(), + remote: Remote { + url, + auth_token, + connector: None, + version: None, + }, + http_request_callback: None, + namespace: None + }, + } + } + } + /// Create a new local replica. pub fn new_local_replica(path: impl AsRef) -> Builder { Builder { @@ -172,6 +198,15 @@ cfg_replication! { namespace: Option, } + /// Remote replica configuration type in [`Builder`]. + pub struct SyncedDatabase { + path: std::path::PathBuf, + flags: crate::OpenFlags, + remote: Remote, + http_request_callback: Option, + namespace: Option, + } + /// Local replica configuration type in [`Builder`]. pub struct LocalReplica { path: std::path::PathBuf, @@ -298,6 +333,91 @@ cfg_replication! { } } + impl Builder { + /// Provide a custom http connector that will be used to create http connections. + pub fn connector(mut self, connector: C) -> Builder + where + C: tower::Service + Send + Clone + Sync + 'static, + C::Response: crate::util::Socket, + C::Future: Send + 'static, + C::Error: Into>, + { + self.inner.remote = self.inner.remote.connector(connector); + self + } + + pub fn http_request_callback(mut self, f: F) -> Builder + where + F: Fn(&mut http::Request<()>) + Send + Sync + 'static + { + self.inner.http_request_callback = Some(std::sync::Arc::new(f)); + self + + } + + /// Set the namespace that will be communicated to remote replica in the http header. + pub fn namespace(mut self, namespace: impl Into) -> Builder + { + self.inner.namespace = Some(namespace.into()); + self + } + + #[doc(hidden)] + pub fn version(mut self, version: String) -> Builder { + self.inner.remote = self.inner.remote.version(version); + self + } + + /// Build a connection to a local database that can be synced to remote server. + pub async fn build(self) -> Result { + let SyncedDatabase { + path, + flags, + remote: + Remote { + url, + auth_token, + connector, + version, + }, + http_request_callback, + namespace + } = self.inner; + + let connector = if let Some(connector) = connector { + connector + } else { + let https = super::connector()?; + use tower::ServiceExt; + + let svc = https + .map_err(|e| e.into()) + .map_response(|s| Box::new(s) as Box); + + crate::util::ConnectorService::new(svc) + }; + + let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned(); + + let db = crate::local::Database::open_local_with_offline_writes( + connector, + path, + flags, + url, + auth_token, + version, + http_request_callback, + namespace, + ) + .await?; + + Ok(Database { + db_type: DbType::Offline { db }, + max_write_replication_index: Default::default(), + }) + } + } + impl Builder { /// Set [`OpenFlags`] for this database. pub fn flags(mut self, flags: crate::OpenFlags) -> Builder { diff --git a/libsql/src/lib.rs b/libsql/src/lib.rs index ac9d500596..2a06f2547a 100644 --- a/libsql/src/lib.rs +++ b/libsql/src/lib.rs @@ -158,6 +158,7 @@ cfg_parser! { mod rows; mod statement; +mod sync; mod transaction; mod value; diff --git a/libsql/src/local/connection.rs b/libsql/src/local/connection.rs index bcf48ff23c..0042d62426 100644 --- a/libsql/src/local/connection.rs +++ b/libsql/src/local/connection.rs @@ -8,7 +8,7 @@ use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction}; use crate::TransactionBehavior; -use libsql_sys::ffi; +use libsql_sys::{ffi, wal}; use std::{ffi::c_int, fmt, path::Path, sync::Arc}; /// A connection to a libSQL database. @@ -57,13 +57,22 @@ impl Connection { ))); } } - - Ok(Connection { + let conn = Connection { raw, drop_ref: Arc::new(()), #[cfg(feature = "replication")] writer: db.writer()?, - }) + }; + if let Some(_) = db.sync_ctx { + // We need to make sure database is in WAL mode with checkpointing + // disabled so that we can sync our changes back to a remote + // server. + conn.query("PRAGMA journal_mode = WAL", Params::None)?; + unsafe { + ffi::libsql_wal_disable_checkpoint(conn.raw); + } + } + Ok(conn) } /// Get a raw handle to the underlying libSQL connection diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 3453a777c9..9ad9d5fc61 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -9,13 +9,15 @@ cfg_replication!( use crate::replication::local_client::LocalClient; use crate::replication::remote_client::RemoteClient; use crate::replication::EmbeddedReplicator; - pub use crate::replication::Frames; + pub use crate::replication::{Replicated, Frames}; pub struct ReplicationContext { pub(crate) replicator: EmbeddedReplicator, client: Option, read_your_writes: bool, } + + use crate::sync::SyncContext; ); use crate::{database::OpenFlags, local::connection::Connection}; @@ -28,6 +30,8 @@ pub struct Database { pub flags: OpenFlags, #[cfg(feature = "replication")] pub replication_ctx: Option, + #[cfg(feature = "replication")] + pub sync_ctx: Option, } impl Database { @@ -122,6 +126,32 @@ impl Database { Ok(db) } + #[cfg(feature = "replication")] + #[doc(hidden)] + pub async fn open_local_with_offline_writes( + connector: crate::util::ConnectorService, + db_path: impl Into, + flags: OpenFlags, + endpoint: String, + auth_token: String, + version: Option, + http_request_callback: Option, + namespace: Option + + ) -> Result { + use std::path::PathBuf; + + let db_path = db_path.into(); + let endpoint = if endpoint.starts_with("libsql:") { + endpoint.replace("libsql:", "https:") + } else { + endpoint + }; + let mut db = Database::open(&db_path, flags)?; + db.sync_ctx = Some(SyncContext::new(endpoint, Some(auth_token))); + Ok(db) + } + #[cfg(feature = "replication")] pub async fn open_local_sync( db_path: impl Into, @@ -228,6 +258,8 @@ impl Database { flags, #[cfg(feature = "replication")] replication_ctx: None, + #[cfg(feature = "replication")] + sync_ctx: None, } } @@ -336,6 +368,61 @@ impl Database { } } + #[cfg(feature = "replication")] + /// Push WAL frames to remote. + pub async fn push(&self) -> Result { + let conn = self.connect()?; + let mut max_frame_no: std::os::raw::c_uint = 0; + unsafe { libsql_sys::ffi::libsql_wal_frame_count(conn.handle(), &mut max_frame_no) }; + println!("Maximum frame: {}", max_frame_no); + let sync_ctx = self.sync_ctx.as_ref().unwrap(); + let generation = 1; + let start_frame_no = sync_ctx.durable_frame_num() + 1; + let end_frame_no = max_frame_no; + + for frame_no in start_frame_no..end_frame_no+1 { + const FRAME_SIZE: usize = 24+4096; // FIXME: make dynamic + let frame: [u8; FRAME_SIZE] = [0; FRAME_SIZE]; + let rc = unsafe { + libsql_sys::ffi::libsql_wal_get_frame(conn.handle(), frame_no, frame.as_ptr() as *mut _, FRAME_SIZE as u32) + }; + if rc != 0 { + return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no))); + } + let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url(), generation, frame_no, frame_no+1); + let auth_token = sync_ctx.auth_token(); + self.send_with_retry(uri, auth_token, frame.to_vec()).await?; + } + Ok(Replicated{ + frame_no: None, + frames_synced: 0, + }) + } + + async fn send_with_retry(&self, uri: String, auth_token: Option<&str>, frame: Vec) -> Result<()> { + let mut nr_retries = 0; + loop { + let client = reqwest::Client::new(); + let mut builder = client.post(uri.to_owned()); + match auth_token { + Some(ref auth_token) => { + builder = builder.header("Authorization", format!("Bearer {}", auth_token.to_owned())); + } + None => {} + } + println!("Pushing frame: {:?}", frame.len()); + let res = builder.body(frame.to_vec()).send().await.unwrap(); + if res.status().is_success() { + return Ok(()); + } + if nr_retries > 5 { + return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status()))); + } + std::thread::sleep(std::time::Duration::from_millis(100 * (1 << nr_retries))); + nr_retries += 1; + } + } + pub(crate) fn path(&self) -> &str { &self.db_path } diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs new file mode 100644 index 0000000000..1e29174f14 --- /dev/null +++ b/libsql/src/sync.rs @@ -0,0 +1,27 @@ +pub struct SyncContext { + sync_url: String, + auth_token: Option, + durable_frame_num: u32, +} + +impl SyncContext { + pub fn new(sync_url: String, auth_token: Option) -> Self { + Self { + sync_url, + auth_token, + durable_frame_num: 0, + } + } + + pub fn durable_frame_num(&self) -> u32 { + self.durable_frame_num + } + + pub fn sync_url(&self) -> &str { + &self.sync_url + } + + pub fn auth_token(&self) -> Option<&str> { + self.auth_token.as_deref() + } +}