From 3d49160c6cf5a33c8df5c7b3fad77a72688b04aa Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 9 Dec 2024 17:50:15 +0800 Subject: [PATCH 01/12] init push --- Cargo.lock | 1 + Cargo.toml | 1 + crates/shared/Cargo.toml | 1 + 3 files changed, 3 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 16bc39a7..8740bcec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4874,6 +4874,7 @@ dependencies = [ "portpicker", "rand 0.8.5", "serde", + "sqlx", "surf-disco", "tide-disco", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 0970d4ee..e6788605 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ url = "2.5" vbs = "0.1" vec1 = "1.12" tracing-subscriber = "0.3" +sqlx = { version = "^0.8", features = ["postgres", "sqlite", "macros"] } [workspace.package] version = "0.1.56" diff --git a/crates/shared/Cargo.toml b/crates/shared/Cargo.toml index 532c2fb2..81feb732 100644 --- a/crates/shared/Cargo.toml +++ b/crates/shared/Cargo.toml @@ -33,6 +33,7 @@ url = { workspace = true } vbs = { workspace = true } vec1 = { workspace = true } tracing-subscriber = { workspace = true } +sqlx = { workspace = true } [dev-dependencies] portpicker = "0.1.1" From cff3c0be1363b79d1c5850cb127d19515dc1d476 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 9 Dec 2024 17:52:51 +0800 Subject: [PATCH 02/12] fmt --- Cargo.lock | 2 +- crates/shared/Cargo.toml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0fdbc34..35b41d94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4744,8 +4744,8 @@ dependencies = [ "quick_cache", "rand 0.8.5", "serde", - "sqlx", "sha2 0.10.8", + "sqlx", "surf-disco", "thiserror 2.0.3", "tide-disco", diff --git a/crates/shared/Cargo.toml b/crates/shared/Cargo.toml index 8df46251..5422c697 100644 --- a/crates/shared/Cargo.toml +++ b/crates/shared/Cargo.toml @@ -37,7 +37,6 @@ tracing-subscriber = { workspace = true } url = { workspace = true } vbs = { workspace = true } vec1 = { workspace = true } -tracing-subscriber = { workspace = true } sqlx = { workspace = true } [dev-dependencies] From 3c8b14d8a4ef2ca9747f68d26579b5491446ff03 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 9 Dec 2024 18:00:53 +0800 Subject: [PATCH 03/12] real first push --- crates/shared/src/persistence/sqlite.rs | 193 ++++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 crates/shared/src/persistence/sqlite.rs diff --git a/crates/shared/src/persistence/sqlite.rs b/crates/shared/src/persistence/sqlite.rs new file mode 100644 index 00000000..39b867dd --- /dev/null +++ b/crates/shared/src/persistence/sqlite.rs @@ -0,0 +1,193 @@ + +#![cfg(feature = "sqlite")] + +use anyhow::Result; +use async_trait::async_trait; +use serde::Serialize; +use sqlx::SqlitePool; + +// Sishan TODO: link to the transaction type we already had +#[derive(Debug)] +struct BuilderDbTransaction { + id: i64, + tx_data: Vec, + created_at: Instance, +} + +pub struct SqliteTxnDb { + pool: SqlitePool, +} + +trait BuilderPersistence { + async fn new(database_url: String) -> Result; + async fn append( + &self, + tx_data: Vec, + ) -> Result<(), sqlx::Error>; + async fn load( + &self, + timeout_after: Instance, + ) -> Result>, sqlx::Error>; + async fn remove( + &self, + tx: Vec, + ) -> Result<()>; +} + +#[async_trait] +impl BuilderPersistence for SqliteTxnDb { + async fn new(database_url: String) -> Result { + let pool = SqlitePool::connect(&database_url).await?; + // it will handle the default CURRENT_TIMESTAMP automatically and assign to transaction's created_at + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS transactions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + tx_data BLOB NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + "#, + ) + .execute(&pool) + .await?; + Ok(Self { pool }) + } + + async fn append(&self, tx_data: Vec) -> Result<(), sqlx::Error> + { + sqlx::query( + r#" + INSERT INTO transactions (tx_data) VALUES (?); + "#, + ) + .bind(tx_data) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn load(&self, timeout_after: Instance) -> Result>, sqlx::Error> + { + // Convert Instant to SystemTime + let now = SystemTime::now(); + let elapsed = timeout_after.elapsed(); + let target_time = now - elapsed; + + // Convert SystemTime to a format SQLite understands (RFC 3339) + let target_timestamp = DateTime::::from(target_time) + .naive_utc() + .format("%Y-%m-%d %H:%M:%S") + .to_string(); + + let rows = sqlx::query( + r#" + SELECT id, tx_data, created_at FROM transactions + WHERE created_at < ? + ORDER BY created_at DESC LIMIT 1; + "#, + ) + .bind(target_timestamp) + .fetch_all(&self.pool) + .await?; + + let tx_data_list = rows + .into_iter() + .map(|row| row.get::, _>("tx_data")) + .collect(); + Ok(tx_data_list) + } + + async fn remove(&self, tx_data: Vec) -> Result<(), sqlx::Error> { + let result = sqlx::query( + r#" + DELETE FROM transactions WHERE tx_data = ?; + "#, + ) + .bind(tx_data) + .execute(&self.pool) + .await?; + + if result.rows_affected() > 0 { + Ok(()) + } else { + Err(sqlx::Error::RowNotFound) + } + } + +} + +#[cfg(test)] +mod test{ + use std::time::Instant; + + /// This test checks we can set up sqlite properly + /// and can do basic append() and load() + #[tokio::test] + pub async fn test_persistence_append_and_load_txn() { + // Initialize the database + let db = SqliteTxnDb::new("sqlite://transactions.db").await?; + + // Append a few transactions + db.append(vec![1, 2, 3]).await?; + db.append(vec![4, 5, 6]).await?; + + // Set timeout_after to the current time + let timeout_after = Instant::now(); + + // Simulate some delay + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // Append more transactions + db.append(vec![7, 8, 9]).await?; + + // Load transactions before timeout_after + let tx_data_list = db.load(timeout_after).await?; + println!("Transaction data before timeout:"); + for tx_data in tx_data_list { + println!("{:?}", tx_data); + } + + + // Sishan TODO: add assertion + // assert_eq!( + // storage.load_transaction().await.unwrap(), + // Some(test_transaction.clone()) + // ); + } + + #[tokio::test] + /// This test checks we can remove transaction from database properly + pub async fn test_persistence_remove_txn() { + // Initialize the database + let db = SqliteTxnDb::new("sqlite://transactions.db").await?; + + // Append some transactions + db.append(vec![1, 2, 3]).await?; + db.append(vec![4, 5, 6]).await?; + db.append(vec![7, 8, 9]).await?; + + // Load all transactions + println!("All transactions before removal:"); + let all_transactions = db.load(Instant::now()).await?; + for tx_data in &all_transactions { + println!("{:?}", tx_data); + } + + // Remove a specific transaction + println!("\nRemoving transaction [4, 5, 6]..."); + if let Err(e) = db.remove(vec![4, 5, 6]).await { + eprintln!("Failed to remove transaction: {}", e); + } else { + println!("Transaction [4, 5, 6] removed."); + } + + // Load all transactions after removal + println!("\nAll transactions after removal:"); + let remaining_transactions = db.load(Instant::now()).await?; + for tx_data in remaining_transactions { + println!("{:?}", tx_data); + } + + Ok(()) + } +} \ No newline at end of file From 779035b0505dbabe0111fba9adbd7615da6d358a Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 9 Dec 2024 19:09:20 +0800 Subject: [PATCH 04/12] runnable version --- crates/shared/src/lib.rs | 1 + crates/shared/src/persistence/sqlite.rs | 152 +++++++++++++----------- 2 files changed, 83 insertions(+), 70 deletions(-) diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index 1fde5dc1..d98d4909 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -3,6 +3,7 @@ pub mod block; pub mod coordinator; pub mod error; +pub mod persistence; pub mod state; #[cfg_attr(coverage_nightly, coverage(off))] pub mod testing; diff --git a/crates/shared/src/persistence/sqlite.rs b/crates/shared/src/persistence/sqlite.rs index 39b867dd..d87c1885 100644 --- a/crates/shared/src/persistence/sqlite.rs +++ b/crates/shared/src/persistence/sqlite.rs @@ -1,41 +1,17 @@ - -#![cfg(feature = "sqlite")] - +use super::get_sqlite_test_db_path; +use super::BuilderPersistence; use anyhow::Result; use async_trait::async_trait; -use serde::Serialize; +use chrono::{DateTime, Utc}; +use sqlx::Row; use sqlx::SqlitePool; +use std::time::{Instant, SystemTime}; -// Sishan TODO: link to the transaction type we already had #[derive(Debug)] -struct BuilderDbTransaction { - id: i64, - tx_data: Vec, - created_at: Instance, -} - pub struct SqliteTxnDb { pool: SqlitePool, } - -trait BuilderPersistence { - async fn new(database_url: String) -> Result; - async fn append( - &self, - tx_data: Vec, - ) -> Result<(), sqlx::Error>; - async fn load( - &self, - timeout_after: Instance, - ) -> Result>, sqlx::Error>; - async fn remove( - &self, - tx: Vec, - ) -> Result<()>; -} - -#[async_trait] -impl BuilderPersistence for SqliteTxnDb { +impl SqliteTxnDb { async fn new(database_url: String) -> Result { let pool = SqlitePool::connect(&database_url).await?; // it will handle the default CURRENT_TIMESTAMP automatically and assign to transaction's created_at @@ -53,8 +29,18 @@ impl BuilderPersistence for SqliteTxnDb { Ok(Self { pool }) } - async fn append(&self, tx_data: Vec) -> Result<(), sqlx::Error> - { + async fn clear(&self) -> Result<(), sqlx::Error> { + // Execute a SQL statement to delete all rows from the `transactions` table + sqlx::query("DELETE FROM transactions") + .execute(&self.pool) + .await?; + Ok(()) + } +} + +#[async_trait] +impl BuilderPersistence for SqliteTxnDb { + async fn append(&self, tx_data: Vec) -> Result<(), sqlx::Error> { sqlx::query( r#" INSERT INTO transactions (tx_data) VALUES (?); @@ -66,8 +52,7 @@ impl BuilderPersistence for SqliteTxnDb { Ok(()) } - async fn load(&self, timeout_after: Instance) -> Result>, sqlx::Error> - { + async fn load(&self, timeout_after: Instant) -> Result>, sqlx::Error> { // Convert Instant to SystemTime let now = SystemTime::now(); let elapsed = timeout_after.elapsed(); @@ -113,24 +98,32 @@ impl BuilderPersistence for SqliteTxnDb { Err(sqlx::Error::RowNotFound) } } - } #[cfg(test)] -mod test{ +mod test { + use super::get_sqlite_test_db_path; + use super::BuilderPersistence; + use super::SqliteTxnDb; use std::time::Instant; /// This test checks we can set up sqlite properly /// and can do basic append() and load() #[tokio::test] - pub async fn test_persistence_append_and_load_txn() { - // Initialize the database - let db = SqliteTxnDb::new("sqlite://transactions.db").await?; + async fn test_persistence_append_and_load_txn() { + // Initialize the database + tracing::debug!( + "get_sqlite_test_db_path() = {:?}", + get_sqlite_test_db_path() + ); + let db = SqliteTxnDb::new(get_sqlite_test_db_path()).await.expect( + "In test_persistence_append_and_load_txn, it should be able to initiate a sqlite db.", + ); // Append a few transactions - db.append(vec![1, 2, 3]).await?; - db.append(vec![4, 5, 6]).await?; - + db.append(vec![1, 2, 3]).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append"); + db.append(vec![4, 5, 6]).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append"); + // Set timeout_after to the current time let timeout_after = Instant::now(); @@ -138,56 +131,75 @@ mod test{ tokio::time::sleep(std::time::Duration::from_secs(1)).await; // Append more transactions - db.append(vec![7, 8, 9]).await?; + db.append(vec![7, 8, 9]).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append"); // Load transactions before timeout_after - let tx_data_list = db.load(timeout_after).await?; - println!("Transaction data before timeout:"); - for tx_data in tx_data_list { - println!("{:?}", tx_data); - } - + let tx_data_list = db.load(timeout_after).await.expect( + "In test_persistence_append_and_load_txn, it should be able to load some transactions.", + ); + tracing::debug!("Transaction data before timeout: {:?}", tx_data_list); // Sishan TODO: add assertion // assert_eq!( // storage.load_transaction().await.unwrap(), // Some(test_transaction.clone()) // ); - } + + db.clear() + .await + .expect("In test_persistence_remove_txn, it should be able to clear all transactions."); + } #[tokio::test] /// This test checks we can remove transaction from database properly - pub async fn test_persistence_remove_txn() { + async fn test_persistence_remove_txn() { // Initialize the database - let db = SqliteTxnDb::new("sqlite://transactions.db").await?; + let db = SqliteTxnDb::new(get_sqlite_test_db_path()) + .await + .expect("In test_persistence_remove_txn, it should be able to initiate a sqlite db."); // Append some transactions - db.append(vec![1, 2, 3]).await?; - db.append(vec![4, 5, 6]).await?; - db.append(vec![7, 8, 9]).await?; + db.append(vec![1, 2, 3]).await.expect( + "In test_persistence_remove_txn, there shouldn't be any error when doing append", + ); + db.append(vec![4, 5, 6]).await.expect( + "In test_persistence_remove_txn, there shouldn't be any error when doing append", + ); + db.append(vec![7, 8, 9]).await.expect( + "In test_persistence_remove_txn, there shouldn't be any error when doing append", + ); // Load all transactions - println!("All transactions before removal:"); - let all_transactions = db.load(Instant::now()).await?; - for tx_data in &all_transactions { - println!("{:?}", tx_data); - } + + let all_transactions = db + .load(Instant::now()) + .await + .expect("In test_persistence_remove_txn, it should be able to load some transactions."); + tracing::debug!("All transactions before removal: {:?}", all_transactions); // Remove a specific transaction - println!("\nRemoving transaction [4, 5, 6]..."); + tracing::debug!("\nRemoving transaction [4, 5, 6]..."); if let Err(e) = db.remove(vec![4, 5, 6]).await { - eprintln!("Failed to remove transaction: {}", e); + panic!("Failed to remove transaction: {}", e); } else { - println!("Transaction [4, 5, 6] removed."); + tracing::debug!("Transaction [4, 5, 6] removed."); } // Load all transactions after removal - println!("\nAll transactions after removal:"); - let remaining_transactions = db.load(Instant::now()).await?; - for tx_data in remaining_transactions { - println!("{:?}", tx_data); - } - Ok(()) + let remaining_transactions = db + .load(Instant::now()) + .await + .expect("In test_persistence_remove_txn, it should be able to load some transactions."); + tracing::debug!( + "\nAll transactions after removal: {:?}", + remaining_transactions + ); + + // Sishan TODO: add assertion + + db.clear() + .await + .expect("In test_persistence_remove_txn, it should be able to clear all transactions."); } -} \ No newline at end of file +} From d501739757a42d467a1fe6279c971ab27e061b61 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 9 Dec 2024 19:10:57 +0800 Subject: [PATCH 05/12] add file --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 84c93d43..f881bc7b 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,6 @@ mutants.out.old/ # OSX **/.DS_Store +# Local database file for testing +*.db + From 7a753bc9a24b0bd02a4d326d4a1b563b34ace767 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 9 Dec 2024 19:11:30 +0800 Subject: [PATCH 06/12] add file --- crates/shared/src/persistence/mod.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 crates/shared/src/persistence/mod.rs diff --git a/crates/shared/src/persistence/mod.rs b/crates/shared/src/persistence/mod.rs new file mode 100644 index 00000000..bf04a901 --- /dev/null +++ b/crates/shared/src/persistence/mod.rs @@ -0,0 +1,23 @@ +use async_trait::async_trait; +use std::env; +use std::time::Instant; + +pub mod sqlite; + +#[async_trait] +pub trait BuilderPersistence { + async fn append(&self, tx_data: Vec) -> Result<(), sqlx::Error>; + async fn load(&self, timeout_after: Instant) -> Result>, sqlx::Error>; + async fn remove(&self, tx: Vec) -> Result<(), sqlx::Error>; +} + +pub fn get_sqlite_test_db_path() -> String { + // Sishan TODO: make it more clean and find a more reliable way + let current_dir = env::current_dir().expect("Failed to get current working directory"); + let mut path = current_dir.clone(); + path.push("src/persistence"); // Add "persistence" directory + path.push("test_data"); // Add "test_data" directory + path.push("sqlite"); // Add "sqlite" directory + path.push("transactions.db"); // Database file name + path.to_string_lossy().to_string() +} From 24ff2a5c4b98d597e23535cfbba2815bc734edba Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 9 Dec 2024 19:41:15 +0800 Subject: [PATCH 07/12] test added --- crates/shared/src/persistence/sqlite.rs | 56 +++++++++++++------------ 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/crates/shared/src/persistence/sqlite.rs b/crates/shared/src/persistence/sqlite.rs index d87c1885..0536854a 100644 --- a/crates/shared/src/persistence/sqlite.rs +++ b/crates/shared/src/persistence/sqlite.rs @@ -67,8 +67,7 @@ impl BuilderPersistence for SqliteTxnDb { let rows = sqlx::query( r#" SELECT id, tx_data, created_at FROM transactions - WHERE created_at < ? - ORDER BY created_at DESC LIMIT 1; + WHERE created_at <= ? ; "#, ) .bind(target_timestamp) @@ -120,9 +119,15 @@ mod test { "In test_persistence_append_and_load_txn, it should be able to initiate a sqlite db.", ); + db.clear() + .await + .expect("In test_persistence_remove_txn, it should be able to clear all transactions."); + // Append a few transactions - db.append(vec![1, 2, 3]).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append"); - db.append(vec![4, 5, 6]).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append"); + let test_tx_data_list = vec![vec![1, 2, 3], vec![4, 5, 6]]; + for tx in test_tx_data_list.clone() { + db.append(tx).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append"); + } // Set timeout_after to the current time let timeout_after = Instant::now(); @@ -130,7 +135,7 @@ mod test { // Simulate some delay tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // Append more transactions + // Append one more transaction db.append(vec![7, 8, 9]).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append"); // Load transactions before timeout_after @@ -139,11 +144,7 @@ mod test { ); tracing::debug!("Transaction data before timeout: {:?}", tx_data_list); - // Sishan TODO: add assertion - // assert_eq!( - // storage.load_transaction().await.unwrap(), - // Some(test_transaction.clone()) - // ); + assert_eq!(tx_data_list, test_tx_data_list); db.clear() .await @@ -158,31 +159,32 @@ mod test { .await .expect("In test_persistence_remove_txn, it should be able to initiate a sqlite db."); + db.clear() + .await + .expect("In test_persistence_remove_txn, it should be able to clear all transactions."); + // Append some transactions - db.append(vec![1, 2, 3]).await.expect( - "In test_persistence_remove_txn, there shouldn't be any error when doing append", - ); - db.append(vec![4, 5, 6]).await.expect( - "In test_persistence_remove_txn, there shouldn't be any error when doing append", - ); - db.append(vec![7, 8, 9]).await.expect( - "In test_persistence_remove_txn, there shouldn't be any error when doing append", - ); + let test_tx_data_list = vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]; + for tx in test_tx_data_list.clone() { + db.append(tx).await.expect( + "In test_persistence_remove_txn, there shouldn't be any error when doing append", + ); + } // Load all transactions - - let all_transactions = db + let mut all_transactions = db .load(Instant::now()) .await .expect("In test_persistence_remove_txn, it should be able to load some transactions."); tracing::debug!("All transactions before removal: {:?}", all_transactions); + assert_eq!(all_transactions, test_tx_data_list); // Remove a specific transaction - tracing::debug!("\nRemoving transaction [4, 5, 6]..."); - if let Err(e) = db.remove(vec![4, 5, 6]).await { + let tx_to_remove = test_tx_data_list[1].clone(); + if let Err(e) = db.remove(tx_to_remove).await { panic!("Failed to remove transaction: {}", e); } else { - tracing::debug!("Transaction [4, 5, 6] removed."); + tracing::debug!("Transaction removed."); } // Load all transactions after removal @@ -192,11 +194,11 @@ mod test { .await .expect("In test_persistence_remove_txn, it should be able to load some transactions."); tracing::debug!( - "\nAll transactions after removal: {:?}", + "All transactions after removal: {:?}", remaining_transactions ); - - // Sishan TODO: add assertion + all_transactions.remove(1); + assert_eq!(remaining_transactions, all_transactions); db.clear() .await From 6544fbd10cb8accb3d0152538707cdb6ebd58edb Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 9 Dec 2024 19:49:43 +0800 Subject: [PATCH 08/12] allow dead code although not sure why the compiler say they re not used --- crates/shared/src/persistence/sqlite.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/shared/src/persistence/sqlite.rs b/crates/shared/src/persistence/sqlite.rs index 0536854a..84bef071 100644 --- a/crates/shared/src/persistence/sqlite.rs +++ b/crates/shared/src/persistence/sqlite.rs @@ -1,4 +1,3 @@ -use super::get_sqlite_test_db_path; use super::BuilderPersistence; use anyhow::Result; use async_trait::async_trait; @@ -7,11 +6,15 @@ use sqlx::Row; use sqlx::SqlitePool; use std::time::{Instant, SystemTime}; +#[allow(unused_imports)] +use super::get_sqlite_test_db_path; + #[derive(Debug)] pub struct SqliteTxnDb { pool: SqlitePool, } impl SqliteTxnDb { + #[allow(dead_code)] async fn new(database_url: String) -> Result { let pool = SqlitePool::connect(&database_url).await?; // it will handle the default CURRENT_TIMESTAMP automatically and assign to transaction's created_at @@ -29,6 +32,7 @@ impl SqliteTxnDb { Ok(Self { pool }) } + #[allow(dead_code)] async fn clear(&self) -> Result<(), sqlx::Error> { // Execute a SQL statement to delete all rows from the `transactions` table sqlx::query("DELETE FROM transactions") From 8255bbe0503c45f4b2e00b7f20ecd85e06f65271 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Tue, 10 Dec 2024 20:54:08 +0800 Subject: [PATCH 09/12] should be able to pass ci with temp dir --- Cargo.lock | 1 + Cargo.toml | 1 + crates/shared/Cargo.toml | 3 ++- crates/shared/src/persistence/mod.rs | 31 +++++++++++++++++-------- crates/shared/src/persistence/sqlite.rs | 30 ++++++++++++++++-------- 5 files changed, 45 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35b41d94..c4372804 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4747,6 +4747,7 @@ dependencies = [ "sha2 0.10.8", "sqlx", "surf-disco", + "tempfile", "thiserror 2.0.3", "tide-disco", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 3d8b1832..75d81e1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ sha2 = "0.10" snafu = "0.8" surf-disco = "0.9" tagged-base64 = "0.4" +tempfile = "3.7" tide-disco = "0.9" thiserror = "2.0" tokio = "1" diff --git a/crates/shared/Cargo.toml b/crates/shared/Cargo.toml index 5422c697..f45cdc81 100644 --- a/crates/shared/Cargo.toml +++ b/crates/shared/Cargo.toml @@ -29,7 +29,9 @@ quick_cache = { workspace = true } rand = { workspace = true } serde = { workspace = true } sha2 = { workspace = true } +sqlx = { workspace = true } surf-disco = { workspace = true } +tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } @@ -37,7 +39,6 @@ tracing-subscriber = { workspace = true } url = { workspace = true } vbs = { workspace = true } vec1 = { workspace = true } -sqlx = { workspace = true } [dev-dependencies] portpicker = { workspace = true } diff --git a/crates/shared/src/persistence/mod.rs b/crates/shared/src/persistence/mod.rs index bf04a901..4f0e96a9 100644 --- a/crates/shared/src/persistence/mod.rs +++ b/crates/shared/src/persistence/mod.rs @@ -1,5 +1,6 @@ +use anyhow::Context; use async_trait::async_trait; -use std::env; +use std::path::{Path, PathBuf}; use std::time::Instant; pub mod sqlite; @@ -11,13 +12,23 @@ pub trait BuilderPersistence { async fn remove(&self, tx: Vec) -> Result<(), sqlx::Error>; } -pub fn get_sqlite_test_db_path() -> String { - // Sishan TODO: make it more clean and find a more reliable way - let current_dir = env::current_dir().expect("Failed to get current working directory"); - let mut path = current_dir.clone(); - path.push("src/persistence"); // Add "persistence" directory - path.push("test_data"); // Add "test_data" directory - path.push("sqlite"); // Add "sqlite" directory - path.push("transactions.db"); // Database file name - path.to_string_lossy().to_string() +pub fn build_sqlite_path(path: &Path) -> anyhow::Result { + let sub_dir = path.join("sqlite"); + + // if `sqlite` sub dir does not exist then create it + if !sub_dir.exists() { + std::fs::create_dir_all(&sub_dir) + .with_context(|| format!("failed to create directory: {:?}", sub_dir))?; + } + + // Return the full path to the SQLite database file + let db_path = sub_dir.join("database.sqlite"); + + // Ensure the file exists (create it if it doesn’t) + if !db_path.exists() { + std::fs::File::create(&db_path) + .with_context(|| format!("Failed to create SQLite database file: {:?}", db_path))?; + } + + Ok(db_path) } diff --git a/crates/shared/src/persistence/sqlite.rs b/crates/shared/src/persistence/sqlite.rs index 84bef071..bcd76dbe 100644 --- a/crates/shared/src/persistence/sqlite.rs +++ b/crates/shared/src/persistence/sqlite.rs @@ -1,3 +1,5 @@ +#[allow(unused_imports)] +use super::build_sqlite_path; use super::BuilderPersistence; use anyhow::Result; use async_trait::async_trait; @@ -6,9 +8,6 @@ use sqlx::Row; use sqlx::SqlitePool; use std::time::{Instant, SystemTime}; -#[allow(unused_imports)] -use super::get_sqlite_test_db_path; - #[derive(Debug)] pub struct SqliteTxnDb { pool: SqlitePool, @@ -105,21 +104,23 @@ impl BuilderPersistence for SqliteTxnDb { #[cfg(test)] mod test { - use super::get_sqlite_test_db_path; + use super::build_sqlite_path; use super::BuilderPersistence; use super::SqliteTxnDb; use std::time::Instant; + use tempfile::tempdir; /// This test checks we can set up sqlite properly /// and can do basic append() and load() #[tokio::test] async fn test_persistence_append_and_load_txn() { + // Create a temporary directory + let tmp_dir = tempdir().expect("In test_persistence_append_and_load_txn, should be able to create a temporary directory."); + // Construct the database path + let db_path = build_sqlite_path(tmp_dir.path()).expect("In test_persistence_append_and_load_txn, should be able to create a temporary database file."); + let database_url = format!("sqlite://{}", db_path.to_str().expect("In test_persistence_append_and_load_txn, should be able to convert temporary database file path to string.")); // Initialize the database - tracing::debug!( - "get_sqlite_test_db_path() = {:?}", - get_sqlite_test_db_path() - ); - let db = SqliteTxnDb::new(get_sqlite_test_db_path()).await.expect( + let db = SqliteTxnDb::new(database_url).await.expect( "In test_persistence_append_and_load_txn, it should be able to initiate a sqlite db.", ); @@ -158,8 +159,17 @@ mod test { #[tokio::test] /// This test checks we can remove transaction from database properly async fn test_persistence_remove_txn() { + // Create a temporary directory + let tmp_dir = tempdir().expect( + "In test_persistence_remove_txn, should be able to create a temporary directory.", + ); + // Construct the database path + let db_path = build_sqlite_path(tmp_dir.path()).expect( + "In test_persistence_remove_txn, should be able to create a temporary database file.", + ); + let database_url = format!("sqlite://{}", db_path.to_str().expect("In test_persistence_remove_txn, should be able to convert temporary database file path to string.")); // Initialize the database - let db = SqliteTxnDb::new(get_sqlite_test_db_path()) + let db = SqliteTxnDb::new(database_url) .await .expect("In test_persistence_remove_txn, it should be able to initiate a sqlite db."); From 2a843469c8c80c40d73ded40f34245ab96184ad6 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Tue, 10 Dec 2024 22:54:46 +0800 Subject: [PATCH 10/12] add more comments --- crates/shared/src/persistence/mod.rs | 4 ++++ crates/shared/src/persistence/sqlite.rs | 7 +++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/shared/src/persistence/mod.rs b/crates/shared/src/persistence/mod.rs index 4f0e96a9..28cd8552 100644 --- a/crates/shared/src/persistence/mod.rs +++ b/crates/shared/src/persistence/mod.rs @@ -5,10 +5,14 @@ use std::time::Instant; pub mod sqlite; +/// The trait BuilderPersistence defined needed functions to maintain persistence of builder-related data #[async_trait] pub trait BuilderPersistence { + /// Append a transaction in Vec to persistence mempool async fn append(&self, tx_data: Vec) -> Result<(), sqlx::Error>; + /// Load all the transactions whose `created_at` is before or equal to `timeout_after` async fn load(&self, timeout_after: Instant) -> Result>, sqlx::Error>; + /// Remove a transaction in Vec from the persistence mempool async fn remove(&self, tx: Vec) -> Result<(), sqlx::Error>; } diff --git a/crates/shared/src/persistence/sqlite.rs b/crates/shared/src/persistence/sqlite.rs index bcd76dbe..a64d4a64 100644 --- a/crates/shared/src/persistence/sqlite.rs +++ b/crates/shared/src/persistence/sqlite.rs @@ -8,13 +8,15 @@ use sqlx::Row; use sqlx::SqlitePool; use std::time::{Instant, SystemTime}; +/// Struct of transaction database in sqlite #[derive(Debug)] pub struct SqliteTxnDb { pool: SqlitePool, } impl SqliteTxnDb { + /// New a SqliteTxnDb by calling with the database_url #[allow(dead_code)] - async fn new(database_url: String) -> Result { + pub async fn new(database_url: String) -> Result { let pool = SqlitePool::connect(&database_url).await?; // it will handle the default CURRENT_TIMESTAMP automatically and assign to transaction's created_at sqlx::query( @@ -31,8 +33,9 @@ impl SqliteTxnDb { Ok(Self { pool }) } + /// Clear all the data in this SqliteTxnDb database #[allow(dead_code)] - async fn clear(&self) -> Result<(), sqlx::Error> { + pub async fn clear(&self) -> Result<(), sqlx::Error> { // Execute a SQL statement to delete all rows from the `transactions` table sqlx::query("DELETE FROM transactions") .execute(&self.pool) From 130b6a5e961c880d8d7cd8a9bf1b90c936e977e6 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Tue, 10 Dec 2024 23:22:26 +0800 Subject: [PATCH 11/12] rename a load parameter --- crates/shared/src/persistence/mod.rs | 5 +++-- crates/shared/src/persistence/sqlite.rs | 18 ++++++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/crates/shared/src/persistence/mod.rs b/crates/shared/src/persistence/mod.rs index 28cd8552..d0520536 100644 --- a/crates/shared/src/persistence/mod.rs +++ b/crates/shared/src/persistence/mod.rs @@ -10,12 +10,13 @@ pub mod sqlite; pub trait BuilderPersistence { /// Append a transaction in Vec to persistence mempool async fn append(&self, tx_data: Vec) -> Result<(), sqlx::Error>; - /// Load all the transactions whose `created_at` is before or equal to `timeout_after` - async fn load(&self, timeout_after: Instant) -> Result>, sqlx::Error>; + /// Load all the transactions whose `created_at` is before or equal to `before_instant` + async fn load(&self, before_instant: Instant) -> Result>, sqlx::Error>; /// Remove a transaction in Vec from the persistence mempool async fn remove(&self, tx: Vec) -> Result<(), sqlx::Error>; } +/// build sqlite database path, if not exist, will create one pub fn build_sqlite_path(path: &Path) -> anyhow::Result { let sub_dir = path.join("sqlite"); diff --git a/crates/shared/src/persistence/sqlite.rs b/crates/shared/src/persistence/sqlite.rs index a64d4a64..659eaa39 100644 --- a/crates/shared/src/persistence/sqlite.rs +++ b/crates/shared/src/persistence/sqlite.rs @@ -14,8 +14,7 @@ pub struct SqliteTxnDb { pool: SqlitePool, } impl SqliteTxnDb { - /// New a SqliteTxnDb by calling with the database_url - #[allow(dead_code)] + /// New a `SqliteTxnDb` by calling with `database_url` pub async fn new(database_url: String) -> Result { let pool = SqlitePool::connect(&database_url).await?; // it will handle the default CURRENT_TIMESTAMP automatically and assign to transaction's created_at @@ -33,8 +32,7 @@ impl SqliteTxnDb { Ok(Self { pool }) } - /// Clear all the data in this SqliteTxnDb database - #[allow(dead_code)] + /// Clear all the data in this `SqliteTxnDb` database pub async fn clear(&self) -> Result<(), sqlx::Error> { // Execute a SQL statement to delete all rows from the `transactions` table sqlx::query("DELETE FROM transactions") @@ -58,10 +56,10 @@ impl BuilderPersistence for SqliteTxnDb { Ok(()) } - async fn load(&self, timeout_after: Instant) -> Result>, sqlx::Error> { + async fn load(&self, before_instant: Instant) -> Result>, sqlx::Error> { // Convert Instant to SystemTime let now = SystemTime::now(); - let elapsed = timeout_after.elapsed(); + let elapsed = before_instant.elapsed(); let target_time = now - elapsed; // Convert SystemTime to a format SQLite understands (RFC 3339) @@ -137,8 +135,8 @@ mod test { db.append(tx).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append"); } - // Set timeout_after to the current time - let timeout_after = Instant::now(); + // Set before_instant to the current time + let before_instant = Instant::now(); // Simulate some delay tokio::time::sleep(std::time::Duration::from_secs(1)).await; @@ -146,8 +144,8 @@ mod test { // Append one more transaction db.append(vec![7, 8, 9]).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append"); - // Load transactions before timeout_after - let tx_data_list = db.load(timeout_after).await.expect( + // Load transactions before before_instant + let tx_data_list = db.load(before_instant).await.expect( "In test_persistence_append_and_load_txn, it should be able to load some transactions.", ); tracing::debug!("Transaction data before timeout: {:?}", tx_data_list); From 42c1fb855ccff9c9402976a3e78b2474e63b3794 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Tue, 10 Dec 2024 23:24:30 +0800 Subject: [PATCH 12/12] lint --- crates/shared/src/persistence/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/shared/src/persistence/mod.rs b/crates/shared/src/persistence/mod.rs index d0520536..000a7dff 100644 --- a/crates/shared/src/persistence/mod.rs +++ b/crates/shared/src/persistence/mod.rs @@ -8,11 +8,11 @@ pub mod sqlite; /// The trait BuilderPersistence defined needed functions to maintain persistence of builder-related data #[async_trait] pub trait BuilderPersistence { - /// Append a transaction in Vec to persistence mempool + /// Append a transaction to persistence mempool async fn append(&self, tx_data: Vec) -> Result<(), sqlx::Error>; /// Load all the transactions whose `created_at` is before or equal to `before_instant` async fn load(&self, before_instant: Instant) -> Result>, sqlx::Error>; - /// Remove a transaction in Vec from the persistence mempool + /// Remove a transaction from the persistence mempool async fn remove(&self, tx: Vec) -> Result<(), sqlx::Error>; }