diff --git a/mm2src/coins/nft/storage/sql_storage.rs b/mm2src/coins/nft/storage/sql_storage.rs index 21be74c50f..18f406a2db 100644 --- a/mm2src/coins/nft/storage/sql_storage.rs +++ b/mm2src/coins/nft/storage/sql_storage.rs @@ -5,7 +5,7 @@ use crate::nft::nft_structs::{Chain, ContractType, ConvertChain, Nft, NftCommon, use crate::nft::storage::{get_offset_limit, NftDetailsJson, NftListStorageOps, NftStorageError, NftTransferHistoryStorageOps, RemoveNftResult, TransferDetailsJson}; use async_trait::async_trait; -use db_common::async_sql_conn::{AsyncConnError, AsyncConnection}; +use db_common::async_sql_conn::{AsyncConnError, AsyncConnection, InternalError}; use db_common::sql_build::{SqlCondition, SqlQuery}; use db_common::sqlite::rusqlite::types::{FromSqlError, Type}; use db_common::sqlite::rusqlite::{Connection, Error as SqlError, Result as SqlResult, Row, Statement}; @@ -22,6 +22,8 @@ use std::convert::TryInto; use std::num::NonZeroUsize; use std::str::FromStr; +const CURRENT_SCHEMA_VERSION_TX_HISTORY: i32 = 2; + impl Chain { fn nft_list_table_name(&self) -> SqlResult { let name = self.to_ticker().to_owned() + "_nft_list"; @@ -42,6 +44,12 @@ fn scanned_nft_blocks_table_name() -> SqlResult { Ok(safe_name) } +fn schema_versions_table_name() -> SqlResult { + let name = "schema_versions".to_string(); + let safe_name = SafeTableName::new(&name)?; + Ok(safe_name) +} + fn create_nft_list_table_sql(chain: &Chain) -> MmResult { let safe_table_name = chain.nft_list_table_name()?; let sql = format!( @@ -110,6 +118,35 @@ fn create_transfer_history_table_sql(chain: &Chain) -> Result Ok(sql) } +fn create_transfer_history_table_sql_custom_name(safe_table_name: &SafeTableName) -> Result { + let sql = format!( + "CREATE TABLE IF NOT EXISTS {} ( + transaction_hash VARCHAR(256) NOT NULL, + log_index INTEGER NOT NULL, + chain TEXT NOT NULL, + block_number INTEGER NOT NULL, + block_timestamp INTEGER NOT NULL, + contract_type TEXT NOT NULL, + token_address VARCHAR(256) NOT NULL, + token_id VARCHAR(256) NOT NULL, + status TEXT NOT NULL, + amount VARCHAR(256) NOT NULL, + possible_spam INTEGER DEFAULT 0 NOT NULL, + possible_phishing INTEGER DEFAULT 0 NOT NULL, + token_uri TEXT, + token_domain TEXT, + collection_name TEXT, + image_url TEXT, + image_domain TEXT, + token_name TEXT, + details_json TEXT, + PRIMARY KEY (transaction_hash, log_index, token_id) + );", + safe_table_name.inner() + ); + Ok(sql) +} + fn create_scanned_nft_blocks_sql() -> Result { let safe_table_name = scanned_nft_blocks_table_name()?; let sql = format!( @@ -122,6 +159,18 @@ fn create_scanned_nft_blocks_sql() -> Result { Ok(sql) } +fn create_schema_versions_sql() -> Result { + let safe_table_name = schema_versions_table_name()?; + let sql = format!( + "CREATE TABLE IF NOT EXISTS {} ( + table_name TEXT PRIMARY KEY, + version INTEGER NOT NULL + );", + safe_table_name.inner() + ); + Ok(sql) +} + impl NftStorageError for AsyncConnError {} fn get_nft_list_builder_preimage(chains: Vec, filters: Option) -> Result { @@ -432,6 +481,15 @@ fn upsert_last_scanned_block_sql() -> Result { Ok(sql) } +fn insert_schema_version_sql() -> Result { + let schema_table = schema_versions_table_name()?; + let sql = format!( + "INSERT INTO {} (table_name, version) VALUES (?1, ?2) ON CONFLICT(table_name) DO NOTHING;", + schema_table.inner() + ); + Ok(sql) +} + fn refresh_nft_metadata_sql(chain: &Chain) -> Result { let safe_table_name = chain.nft_list_table_name()?; let sql = format!( @@ -462,12 +520,25 @@ fn update_transfer_spam_by_token_addr_id(chain: &Chain) -> Result Result { - let sql = format!( +/// Generates the SQL command to insert or update the schema version in the `schema_versions` table. +/// +/// This function creates an SQL command that attempts to insert a new row with the specified +/// `table_name` and `version`. If a row with the same `table_name` already exists, the `version` +/// field is updated to the new value provided. +fn update_schema_version_sql(schema_versions: &SafeTableName) -> String { + format!( + "INSERT INTO {} (table_name, version) + VALUES (?1, ?2) + ON CONFLICT(table_name) DO UPDATE SET version = excluded.version;", + schema_versions.inner() + ) +} + +fn select_last_block_number_sql(safe_table_name: SafeTableName) -> String { + format!( "SELECT block_number FROM {} ORDER BY block_number DESC LIMIT 1", safe_table_name.inner() - ); - Ok(sql) + ) } fn select_last_scanned_block_sql() -> MmResult { @@ -540,6 +611,13 @@ fn get_transfers_with_empty_meta_builder<'a>(conn: &'a Connection, chain: &'a Ch Ok(sql_builder) } +fn get_schema_version_stmt(conn: &Connection) -> Result { + let table_name = schema_versions_table_name()?; + let sql = format!("SELECT version FROM {} WHERE table_name = ?1;", table_name.inner()); + let stmt = conn.prepare(&sql)?; + Ok(stmt) +} + fn is_table_empty(conn: &Connection, safe_table_name: SafeTableName) -> Result { let query = format!("SELECT COUNT(*) FROM {}", safe_table_name.inner()); conn.query_row(&query, [], |row| row.get::<_, i64>(0)) @@ -777,7 +855,7 @@ impl NftListStorageOps for AsyncMutexGuard<'_, AsyncConnection> { async fn get_last_block_number(&self, chain: &Chain) -> MmResult, Self::Error> { let table_name = chain.nft_list_table_name()?; - let sql = select_last_block_number_sql(table_name)?; + let sql = select_last_block_number_sql(table_name); self.call(move |conn| { let block_number = query_single_row(conn, &sql, [], block_number_from_row)?; Ok(block_number) @@ -969,8 +1047,15 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { async fn init(&self, chain: &Chain) -> MmResult<(), Self::Error> { let sql_transfer_history = create_transfer_history_table_sql(chain)?; + let table_name = chain.transfer_history_table_name()?; self.call(move |conn| { conn.execute(&sql_transfer_history, []).map(|_| ())?; + conn.execute(&create_schema_versions_sql()?, []).map(|_| ())?; + conn.execute(&insert_schema_version_sql()?, [ + table_name.inner(), + &CURRENT_SCHEMA_VERSION_TX_HISTORY.to_string(), + ]) + .map(|_| ())?; Ok(()) }) .await @@ -978,11 +1063,29 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { } async fn is_initialized(&self, chain: &Chain) -> MmResult { - let table_name = chain.transfer_history_table_name()?; + let history_table = chain.transfer_history_table_name()?; + let schema_table = schema_versions_table_name()?; self.call(move |conn| { - let nft_list_initialized = - query_single_row(conn, CHECK_TABLE_EXISTS_SQL, [table_name.inner()], string_from_row)?; - Ok(nft_list_initialized.is_some()) + let history_table_exists = + query_single_row(conn, CHECK_TABLE_EXISTS_SQL, [history_table.inner()], string_from_row)?; + let schema_versions_table_exists = + query_single_row(conn, CHECK_TABLE_EXISTS_SQL, [schema_table.inner()], string_from_row)?; + if history_table_exists.is_none() { + return Ok(false); + } + if schema_versions_table_exists.is_none() { + conn.execute(&create_schema_versions_sql()?, []).map(|_| ())?; + } + let version: i32 = get_schema_version_stmt(conn)? + .query_row([history_table.inner()], |row| row.get(0)) + .unwrap_or(0); + + if version < CURRENT_SCHEMA_VERSION_TX_HISTORY { + // Call migration function to update the table schema to the latest version + migrate_tx_history_table_schema(conn, history_table, schema_table)?; + } + + Ok(true) }) .await .map_to_mm(AsyncConnError::from) @@ -1077,7 +1180,7 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { async fn get_last_block_number(&self, chain: &Chain) -> MmResult, Self::Error> { let table_name = chain.transfer_history_table_name()?; - let sql = select_last_block_number_sql(table_name)?; + let sql = select_last_block_number_sql(table_name); self.call(move |conn| { let block_number = query_single_row(conn, &sql, [], block_number_from_row)?; Ok(block_number) @@ -1311,3 +1414,63 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { .map_to_mm(AsyncConnError::from) } } + +fn migrate_tx_history_table_schema( + conn: &mut Connection, + history_table: SafeTableName, + schema_table: SafeTableName, +) -> Result<(), AsyncConnError> { + if has_primary_key_duplication(conn, &history_table)? { + return Err(AsyncConnError::Internal(InternalError( + "Primary key duplication occurred in old nft tx history table".to_string(), + ))); + } + + // Start a transaction to ensure all operations are atomic + let sql_tx = conn.transaction()?; + + let temp_table_name = SafeTableName::new(format!("{}_temp", history_table.inner()).as_str())?; + sql_tx.execute(&create_transfer_history_table_sql_custom_name(&temp_table_name)?, [])?; + + let copy_data_sql = format!( + "INSERT INTO {} SELECT * FROM {};", + temp_table_name.inner(), + history_table.inner() + ); + sql_tx.execute(©_data_sql, [])?; + + let drop_old_table_sql = format!("DROP TABLE {};", history_table.inner()); + sql_tx.execute(&drop_old_table_sql, [])?; + + let rename_table_sql = format!( + "ALTER TABLE {} RENAME TO {};", + temp_table_name.inner(), + history_table.inner() + ); + sql_tx.execute(&rename_table_sql, [])?; + + sql_tx.execute(&update_schema_version_sql(&schema_table), [ + history_table.inner().to_string(), + CURRENT_SCHEMA_VERSION_TX_HISTORY.to_string(), + ])?; + + sql_tx.commit()?; + + Ok(()) +} + +/// Query to check for duplicates based on the primary key columns from tx history table version 2 +fn has_primary_key_duplication(conn: &Connection, safe_table_name: &SafeTableName) -> Result { + let query = format!( + "SELECT EXISTS ( + SELECT 1 + FROM {} + GROUP BY transaction_hash, log_index, token_id + HAVING COUNT(*) > 1 + );", + safe_table_name.inner() + ); + // return true if duplicates exist, false otherwise + conn.query_row(&query, [], |row| row.get::<_, i32>(0)) + .map(|exists| exists == 1) +}