Skip to content

Commit

Permalink
cover sql nft tx history migration
Browse files Browse the repository at this point in the history
  • Loading branch information
laruh committed Sep 13, 2024
1 parent a493f3f commit 48a7f5b
Showing 1 changed file with 174 additions and 11 deletions.
185 changes: 174 additions & 11 deletions mm2src/coins/nft/storage/sql_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::nft::nft_structs::{Chain, ContractType, ConvertChain, Nft, NftCommon,
use crate::nft::storage::{get_offset_limit, NftDetailsJson, NftListStorageOps, NftStorageError,
NftTransferHistoryStorageOps, RemoveNftResult, TransferDetailsJson};
use async_trait::async_trait;
use db_common::async_sql_conn::{AsyncConnError, AsyncConnection};
use db_common::async_sql_conn::{AsyncConnError, AsyncConnection, InternalError};
use db_common::sql_build::{SqlCondition, SqlQuery};
use db_common::sqlite::rusqlite::types::{FromSqlError, Type};
use db_common::sqlite::rusqlite::{Connection, Error as SqlError, Result as SqlResult, Row, Statement};
Expand All @@ -22,6 +22,8 @@ use std::convert::TryInto;
use std::num::NonZeroUsize;
use std::str::FromStr;

const CURRENT_SCHEMA_VERSION_TX_HISTORY: i32 = 2;

impl Chain {
fn nft_list_table_name(&self) -> SqlResult<SafeTableName> {
let name = self.to_ticker().to_owned() + "_nft_list";
Expand All @@ -42,6 +44,12 @@ fn scanned_nft_blocks_table_name() -> SqlResult<SafeTableName> {
Ok(safe_name)
}

fn schema_versions_table_name() -> SqlResult<SafeTableName> {
let name = "schema_versions".to_string();
let safe_name = SafeTableName::new(&name)?;
Ok(safe_name)
}

fn create_nft_list_table_sql(chain: &Chain) -> MmResult<String, SqlError> {
let safe_table_name = chain.nft_list_table_name()?;
let sql = format!(
Expand Down Expand Up @@ -110,6 +118,35 @@ fn create_transfer_history_table_sql(chain: &Chain) -> Result<String, SqlError>
Ok(sql)
}

fn create_transfer_history_table_sql_custom_name(safe_table_name: &SafeTableName) -> Result<String, SqlError> {
let sql = format!(
"CREATE TABLE IF NOT EXISTS {} (
transaction_hash VARCHAR(256) NOT NULL,
log_index INTEGER NOT NULL,
chain TEXT NOT NULL,
block_number INTEGER NOT NULL,
block_timestamp INTEGER NOT NULL,
contract_type TEXT NOT NULL,
token_address VARCHAR(256) NOT NULL,
token_id VARCHAR(256) NOT NULL,
status TEXT NOT NULL,
amount VARCHAR(256) NOT NULL,
possible_spam INTEGER DEFAULT 0 NOT NULL,
possible_phishing INTEGER DEFAULT 0 NOT NULL,
token_uri TEXT,
token_domain TEXT,
collection_name TEXT,
image_url TEXT,
image_domain TEXT,
token_name TEXT,
details_json TEXT,
PRIMARY KEY (transaction_hash, log_index, token_id)
);",
safe_table_name.inner()
);
Ok(sql)
}

fn create_scanned_nft_blocks_sql() -> Result<String, SqlError> {
let safe_table_name = scanned_nft_blocks_table_name()?;
let sql = format!(
Expand All @@ -122,6 +159,18 @@ fn create_scanned_nft_blocks_sql() -> Result<String, SqlError> {
Ok(sql)
}

fn create_schema_versions_sql() -> Result<String, SqlError> {
let safe_table_name = schema_versions_table_name()?;
let sql = format!(
"CREATE TABLE IF NOT EXISTS {} (
table_name TEXT PRIMARY KEY,
version INTEGER NOT NULL
);",
safe_table_name.inner()
);
Ok(sql)
}

impl NftStorageError for AsyncConnError {}

fn get_nft_list_builder_preimage(chains: Vec<Chain>, filters: Option<NftListFilters>) -> Result<SqlBuilder, SqlError> {
Expand Down Expand Up @@ -432,6 +481,15 @@ fn upsert_last_scanned_block_sql() -> Result<String, SqlError> {
Ok(sql)
}

fn insert_schema_version_sql() -> Result<String, SqlError> {
let schema_table = schema_versions_table_name()?;
let sql = format!(
"INSERT INTO {} (table_name, version) VALUES (?1, ?2) ON CONFLICT(table_name) DO NOTHING;",
schema_table.inner()
);
Ok(sql)
}

fn refresh_nft_metadata_sql(chain: &Chain) -> Result<String, SqlError> {
let safe_table_name = chain.nft_list_table_name()?;
let sql = format!(
Expand Down Expand Up @@ -462,12 +520,25 @@ fn update_transfer_spam_by_token_addr_id(chain: &Chain) -> Result<String, SqlErr
Ok(sql)
}

fn select_last_block_number_sql(safe_table_name: SafeTableName) -> Result<String, SqlError> {
let sql = format!(
/// Generates the SQL command to insert or update the schema version in the `schema_versions` table.
///
/// This function creates an SQL command that attempts to insert a new row with the specified
/// `table_name` and `version`. If a row with the same `table_name` already exists, the `version`
/// field is updated to the new value provided.
fn update_schema_version_sql(schema_versions: &SafeTableName) -> String {
format!(
"INSERT INTO {} (table_name, version)
VALUES (?1, ?2)
ON CONFLICT(table_name) DO UPDATE SET version = excluded.version;",
schema_versions.inner()
)
}

fn select_last_block_number_sql(safe_table_name: SafeTableName) -> String {
format!(
"SELECT block_number FROM {} ORDER BY block_number DESC LIMIT 1",
safe_table_name.inner()
);
Ok(sql)
)
}

fn select_last_scanned_block_sql() -> MmResult<String, SqlError> {
Expand Down Expand Up @@ -540,6 +611,13 @@ fn get_transfers_with_empty_meta_builder<'a>(conn: &'a Connection, chain: &'a Ch
Ok(sql_builder)
}

fn get_schema_version_stmt(conn: &Connection) -> Result<Statement, SqlError> {
let table_name = schema_versions_table_name()?;
let sql = format!("SELECT version FROM {} WHERE table_name = ?1;", table_name.inner());
let stmt = conn.prepare(&sql)?;
Ok(stmt)
}

fn is_table_empty(conn: &Connection, safe_table_name: SafeTableName) -> Result<bool, SqlError> {
let query = format!("SELECT COUNT(*) FROM {}", safe_table_name.inner());
conn.query_row(&query, [], |row| row.get::<_, i64>(0))
Expand Down Expand Up @@ -777,7 +855,7 @@ impl NftListStorageOps for AsyncMutexGuard<'_, AsyncConnection> {

async fn get_last_block_number(&self, chain: &Chain) -> MmResult<Option<u64>, Self::Error> {
let table_name = chain.nft_list_table_name()?;
let sql = select_last_block_number_sql(table_name)?;
let sql = select_last_block_number_sql(table_name);
self.call(move |conn| {
let block_number = query_single_row(conn, &sql, [], block_number_from_row)?;
Ok(block_number)
Expand Down Expand Up @@ -969,20 +1047,45 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> {

async fn init(&self, chain: &Chain) -> MmResult<(), Self::Error> {
let sql_transfer_history = create_transfer_history_table_sql(chain)?;
let table_name = chain.transfer_history_table_name()?;
self.call(move |conn| {
conn.execute(&sql_transfer_history, []).map(|_| ())?;
conn.execute(&create_schema_versions_sql()?, []).map(|_| ())?;
conn.execute(&insert_schema_version_sql()?, [
table_name.inner(),
&CURRENT_SCHEMA_VERSION_TX_HISTORY.to_string(),
])
.map(|_| ())?;
Ok(())
})
.await
.map_to_mm(AsyncConnError::from)
}

async fn is_initialized(&self, chain: &Chain) -> MmResult<bool, Self::Error> {
let table_name = chain.transfer_history_table_name()?;
let history_table = chain.transfer_history_table_name()?;
let schema_table = schema_versions_table_name()?;
self.call(move |conn| {
let nft_list_initialized =
query_single_row(conn, CHECK_TABLE_EXISTS_SQL, [table_name.inner()], string_from_row)?;
Ok(nft_list_initialized.is_some())
let history_table_exists =
query_single_row(conn, CHECK_TABLE_EXISTS_SQL, [history_table.inner()], string_from_row)?;
let schema_versions_table_exists =
query_single_row(conn, CHECK_TABLE_EXISTS_SQL, [schema_table.inner()], string_from_row)?;
if history_table_exists.is_none() {
return Ok(false);
}
if schema_versions_table_exists.is_none() {
conn.execute(&create_schema_versions_sql()?, []).map(|_| ())?;
}
let version: i32 = get_schema_version_stmt(conn)?
.query_row([history_table.inner()], |row| row.get(0))
.unwrap_or(0);

if version < CURRENT_SCHEMA_VERSION_TX_HISTORY {
// Call migration function to update the table schema to the latest version
migrate_tx_history_table_schema(conn, history_table, schema_table)?;
}

Ok(true)
})
.await
.map_to_mm(AsyncConnError::from)
Expand Down Expand Up @@ -1077,7 +1180,7 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> {

async fn get_last_block_number(&self, chain: &Chain) -> MmResult<Option<u64>, Self::Error> {
let table_name = chain.transfer_history_table_name()?;
let sql = select_last_block_number_sql(table_name)?;
let sql = select_last_block_number_sql(table_name);
self.call(move |conn| {
let block_number = query_single_row(conn, &sql, [], block_number_from_row)?;
Ok(block_number)
Expand Down Expand Up @@ -1311,3 +1414,63 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> {
.map_to_mm(AsyncConnError::from)
}
}

fn migrate_tx_history_table_schema(
conn: &mut Connection,
history_table: SafeTableName,
schema_table: SafeTableName,
) -> Result<(), AsyncConnError> {
if has_primary_key_duplication(conn, &history_table)? {
return Err(AsyncConnError::Internal(InternalError(
"Primary key duplication occurred in old nft tx history table".to_string(),
)));
}

// Start a transaction to ensure all operations are atomic
let sql_tx = conn.transaction()?;

let temp_table_name = SafeTableName::new(format!("{}_temp", history_table.inner()).as_str())?;
sql_tx.execute(&create_transfer_history_table_sql_custom_name(&temp_table_name)?, [])?;

let copy_data_sql = format!(
"INSERT INTO {} SELECT * FROM {};",
temp_table_name.inner(),
history_table.inner()
);
sql_tx.execute(&copy_data_sql, [])?;

let drop_old_table_sql = format!("DROP TABLE {};", history_table.inner());
sql_tx.execute(&drop_old_table_sql, [])?;

let rename_table_sql = format!(
"ALTER TABLE {} RENAME TO {};",
temp_table_name.inner(),
history_table.inner()
);
sql_tx.execute(&rename_table_sql, [])?;

sql_tx.execute(&update_schema_version_sql(&schema_table), [
history_table.inner().to_string(),
CURRENT_SCHEMA_VERSION_TX_HISTORY.to_string(),
])?;

sql_tx.commit()?;

Ok(())
}

/// Query to check for duplicates based on the primary key columns from tx history table version 2
fn has_primary_key_duplication(conn: &Connection, safe_table_name: &SafeTableName) -> Result<bool, SqlError> {
let query = format!(
"SELECT EXISTS (
SELECT 1
FROM {}
GROUP BY transaction_hash, log_index, token_id
HAVING COUNT(*) > 1
);",
safe_table_name.inner()
);
// return true if duplicates exist, false otherwise
conn.query_row(&query, [], |row| row.get::<_, i32>(0))
.map(|exists| exists == 1)
}

0 comments on commit 48a7f5b

Please sign in to comment.