Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sqlite database framework and simple tests #277

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ mutants.out.old/
# OSX
**/.DS_Store

# Local database file for testing
*.db

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ url = "2.5"
vbs = "0.1"
vec1 = "1.12"
tracing-subscriber = "0.3"
sqlx = { version = "^0.8", features = ["postgres", "sqlite", "macros"] }

[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = [
Expand Down
1 change: 1 addition & 0 deletions crates/shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tracing-subscriber = { workspace = true }
url = { workspace = true }
vbs = { workspace = true }
vec1 = { workspace = true }
sqlx = { workspace = true }

[dev-dependencies]
portpicker = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub mod block;
pub mod coordinator;
pub mod error;
pub mod persistence;
pub mod state;
#[cfg_attr(coverage_nightly, coverage(off))]
pub mod testing;
Expand Down
23 changes: 23 additions & 0 deletions crates/shared/src/persistence/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use async_trait::async_trait;
use std::env;
use std::time::Instant;

pub mod sqlite;

#[async_trait]
pub trait BuilderPersistence {
async fn append(&self, tx_data: Vec<u8>) -> Result<(), sqlx::Error>;
async fn load(&self, timeout_after: Instant) -> Result<Vec<Vec<u8>>, sqlx::Error>;
async fn remove(&self, tx: Vec<u8>) -> Result<(), sqlx::Error>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these functions be public? Also, can we add documentation for them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the trait BuilderPersistence is public these functions should be public automatically, correct me if I'm wrong. But this is a good reminder that clear() and new() should also be public. Also added more comments.

}

pub fn get_sqlite_test_db_path() -> String {
// Sishan TODO: make it more clean and find a more reliable way
let current_dir = env::current_dir().expect("Failed to get current working directory");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this TODO isn't going to be resolved in this PR, can we make a GH issue and add the link here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be resolved now!

let mut path = current_dir.clone();
path.push("src/persistence"); // Add "persistence" directory
path.push("test_data"); // Add "test_data" directory
path.push("sqlite"); // Add "sqlite" directory
path.push("transactions.db"); // Database file name
path.to_string_lossy().to_string()
}
211 changes: 211 additions & 0 deletions crates/shared/src/persistence/sqlite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
use super::BuilderPersistence;
use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use sqlx::Row;
use sqlx::SqlitePool;
use std::time::{Instant, SystemTime};

#[allow(unused_imports)]
use super::get_sqlite_test_db_path;

#[derive(Debug)]
pub struct SqliteTxnDb {
pool: SqlitePool,
}
Comment on lines +13 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Are we going to add more fields to this struct? If not, can we make it SqliteTxnDb(SqlitePool)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we're going to add some pruning parameters here like minimum retention or target retention. It will be expanded later but I don't have a strong opinion for now.

impl SqliteTxnDb {
#[allow(dead_code)]
async fn new(database_url: String) -> Result<Self, sqlx::Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this constructor and fn clear both be public? Then we can remove #[allow(dead_code)].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's the reason!

let pool = SqlitePool::connect(&database_url).await?;
// it will handle the default CURRENT_TIMESTAMP automatically and assign to transaction's created_at
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tx_data BLOB NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
"#,
)
.execute(&pool)
.await?;
Ok(Self { pool })
}

#[allow(dead_code)]
async fn clear(&self) -> Result<(), sqlx::Error> {
// Execute a SQL statement to delete all rows from the `transactions` table
sqlx::query("DELETE FROM transactions")
.execute(&self.pool)
.await?;
Ok(())
}
}

#[async_trait]
impl BuilderPersistence for SqliteTxnDb {
async fn append(&self, tx_data: Vec<u8>) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO transactions (tx_data) VALUES (?);
"#,
)
.bind(tx_data)
.execute(&self.pool)
.await?;
Ok(())
}

async fn load(&self, timeout_after: Instant) -> Result<Vec<Vec<u8>>, sqlx::Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the parameter a bit confusing. Can we call this before to indicate we want transactions before the given timestamp? I don't think the function itself cares whether the input time is relevant to a timeout or not.

Besides the argument name, I was also wondering whether this function would be what we want. I thought we would use load to get transactions after a certain timestamp, not before. See my comment on the test about why the current implementation doesn't seem to provide all the information we want.

Copy link
Contributor Author

@dailinsubjam dailinsubjam Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about the naming thing, I just realized it's really confusing, updated to before_instant. For the use case of the function load, see my reply on the test.

// Convert Instant to SystemTime
let now = SystemTime::now();
let elapsed = timeout_after.elapsed();
let target_time = now - elapsed;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to directly convert timeout_after from Instant to SystemTime without doing now - elapsed? The current conversion seems a bit redundant, and may even be inaccurate because it relies on an additional now time.

Copy link
Contributor Author

@dailinsubjam dailinsubjam Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the most straightforward way as far as I know. The reason for this is that Instant represents a point in time relative to an unspecified reference (it doesn't have a fixed "epoch" like SystemTime), while SystemTime represents an absolute point in time from the UNIX epoch. So without using now - elapsed, we want to first obtaining the system time at the moment of the Instant's creation.


// Convert SystemTime to a format SQLite understands (RFC 3339)
let target_timestamp = DateTime::<Utc>::from(target_time)
.naive_utc()
.format("%Y-%m-%d %H:%M:%S")
.to_string();

let rows = sqlx::query(
r#"
SELECT id, tx_data, created_at FROM transactions
WHERE created_at <= ? ;
"#,
)
.bind(target_timestamp)
.fetch_all(&self.pool)
.await?;

let tx_data_list = rows
.into_iter()
.map(|row| row.get::<Vec<u8>, _>("tx_data"))
.collect();
Ok(tx_data_list)
}

async fn remove(&self, tx_data: Vec<u8>) -> Result<(), sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM transactions WHERE tx_data = ?;
"#,
)
.bind(tx_data)
.execute(&self.pool)
.await?;

if result.rows_affected() > 0 {
Ok(())
} else {
Err(sqlx::Error::RowNotFound)
}
}
}

#[cfg(test)]
mod test {
use super::get_sqlite_test_db_path;
use super::BuilderPersistence;
use super::SqliteTxnDb;
use std::time::Instant;

/// This test checks we can set up sqlite properly
/// and can do basic append() and load()
#[tokio::test]
async fn test_persistence_append_and_load_txn() {
// Initialize the database
tracing::debug!(
"get_sqlite_test_db_path() = {:?}",
get_sqlite_test_db_path()
);
let db = SqliteTxnDb::new(get_sqlite_test_db_path()).await.expect(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this create temporary files? If any assertion fails before the test completes, will those files be deleted?

Also, will it be better if we add an index or test name, etc. to distinguish the get_sqlite_test_db_path calls in tests and avoid race conditions (is it necessary?)? E.g., We can call get_sqlite_test_db_path("test_append_and load") in this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is updated to use the crate tempfile which will create a new temporary directory on the filesystem and the directory will be automatically deleted when it goes out of scope (if the returned TempDir value is dropped).

"In test_persistence_append_and_load_txn, it should be able to initiate a sqlite db.",
);

db.clear()
.await
.expect("In test_persistence_remove_txn, it should be able to clear all transactions.");

// Append a few transactions
let test_tx_data_list = vec![vec![1, 2, 3], vec![4, 5, 6]];
for tx in test_tx_data_list.clone() {
db.append(tx).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append");
}

// Set timeout_after to the current time
let timeout_after = Instant::now();

// Simulate some delay
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// Append one more transaction
db.append(vec![7, 8, 9]).await.expect("In test_persistence_append_and_load_txn, there shouldn't be any error when doing append");

// Load transactions before timeout_after
let tx_data_list = db.load(timeout_after).await.expect(
"In test_persistence_append_and_load_txn, it should be able to load some transactions.",
);
tracing::debug!("Transaction data before timeout: {:?}", tx_data_list);

assert_eq!(tx_data_list, test_tx_data_list);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case seems a bit off or incomplete to me. If we append a transaction after the timeout, don't we want a load function to provide us with the new transaction? In this example, it's not clear how the builder will eventually submit [1, 2, 3], [4, 5, 6], and [7, 8, 9]. After it gets [1, 2, 3], [4, 5, 6] by calling load, is the builder going to call load again to get [7, 8, 9]?

I was thinking load would give us all three transactions at once, or even if not by one call, there would be a convenient way to get all transactions, but this test seems to suggest that we don't need [7, 8, 9].

Copy link
Contributor Author

@dailinsubjam dailinsubjam Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load(self, instant) would give us all the transactions created at some point before the given instant. Sorry about the confusion from naming. The test is just to check load could behave correctly, i.e. if we have some transactions submitted after the given instant, they shouldn't be returned, and only those submitted before should be returned.

load(self, instant) is designed this way because we have a function collect_txns(timeout_after) to collect all the submitted txn till an instant. Now I think for database it might be the same to return all the transactions, as the only timeout_after we'll set is near future, and this is our only use case. I'd vote to remove it if there's more overhead due to this argument or else I'd keep it. How do you think?


db.clear()
.await
.expect("In test_persistence_remove_txn, it should be able to clear all transactions.");
}

#[tokio::test]
/// This test checks we can remove transaction from database properly
async fn test_persistence_remove_txn() {
// Initialize the database
let db = SqliteTxnDb::new(get_sqlite_test_db_path())
.await
.expect("In test_persistence_remove_txn, it should be able to initiate a sqlite db.");

db.clear()
.await
.expect("In test_persistence_remove_txn, it should be able to clear all transactions.");

// Append some transactions
let test_tx_data_list = vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]];
for tx in test_tx_data_list.clone() {
db.append(tx).await.expect(
"In test_persistence_remove_txn, there shouldn't be any error when doing append",
);
}

// Load all transactions
let mut all_transactions = db
.load(Instant::now())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my comment on load about the argument name: Here Instant::now() isn't a timeout timestamp at all, so the argument name shouldn't be tied with the timeout concept.

.await
.expect("In test_persistence_remove_txn, it should be able to load some transactions.");
tracing::debug!("All transactions before removal: {:?}", all_transactions);
assert_eq!(all_transactions, test_tx_data_list);

// Remove a specific transaction
let tx_to_remove = test_tx_data_list[1].clone();
if let Err(e) = db.remove(tx_to_remove).await {
panic!("Failed to remove transaction: {}", e);
} else {
tracing::debug!("Transaction removed.");
}

// Load all transactions after removal

let remaining_transactions = db
.load(Instant::now())
.await
.expect("In test_persistence_remove_txn, it should be able to load some transactions.");
tracing::debug!(
"All transactions after removal: {:?}",
remaining_transactions
);
all_transactions.remove(1);
assert_eq!(remaining_transactions, all_transactions);

db.clear()
.await
.expect("In test_persistence_remove_txn, it should be able to clear all transactions.");
}
}
Loading