diff --git a/program_transformers/src/asset_upserts.rs b/program_transformers/src/asset_upserts.rs index ee0b62713..a8e155031 100644 --- a/program_transformers/src/asset_upserts.rs +++ b/program_transformers/src/asset_upserts.rs @@ -6,8 +6,8 @@ use { }, }, sea_orm::{ - sea_query::OnConflict, ConnectionTrait, DbBackend, DbErr, EntityTrait, QueryTrait, Set, - TransactionTrait, + sea_query::{Alias, Condition, Expr, OnConflict}, + ConnectionTrait, DbErr, EntityTrait, Set, TransactionTrait, }, serde_json::value::Value, sqlx::types::Decimal, @@ -33,7 +33,8 @@ pub async fn upsert_assets_token_account_columns= asset.slot_updated_token_account OR asset.slot_updated_token_account IS NULL) AND asset.owner_type = 'single'", - query.sql); - txn_or_conn.execute(query).await?; Ok(()) } @@ -78,7 +138,7 @@ pub async fn upsert_assets_mint_account_columns= asset.slot_updated_mint_account OR asset.slot_updated_mint_account IS NULL", - query.sql); - txn_or_conn.execute(query).await?; Ok(()) } @@ -144,7 +241,7 @@ pub async fn upsert_assets_metadata_account_columns= asset.slot_updated_metadata_account OR asset.slot_updated_metadata_account IS NULL", - query.sql); - txn_or_conn.execute(query).await?; + .exec_without_returning(txn_or_conn) + .await?; Ok(()) } diff --git a/program_transformers/src/mpl_core_program/v1_asset.rs b/program_transformers/src/mpl_core_program/v1_asset.rs index a1dab3369..8886e5bb8 100644 --- a/program_transformers/src/mpl_core_program/v1_asset.rs +++ b/program_transformers/src/mpl_core_program/v1_asset.rs @@ -21,10 +21,9 @@ use { sea_orm::{ entity::{ActiveValue, ColumnTrait, EntityTrait}, prelude::*, - query::{JsonValue, QueryFilter, QueryTrait}, - sea_query::query::OnConflict, - sea_query::Expr, - ConnectionTrait, CursorTrait, DbBackend, Statement, TransactionTrait, + query::{JsonValue, QueryFilter}, + sea_query::{query::OnConflict, Alias, Condition, Expr}, + ConnectionTrait, CursorTrait, Statement, TransactionTrait, }, serde_json::{value::Value, Map}, solana_sdk::pubkey::Pubkey, @@ -43,18 +42,22 @@ pub async fn burn_v1_asset( burnt: ActiveValue::Set(true), ..Default::default() }; - let mut query = asset::Entity::insert(model) + asset::Entity::insert(model) .on_conflict( - OnConflict::columns([asset::Column::Id]) + OnConflict::column(asset::Column::Id) .update_columns([asset::Column::SlotUpdated, asset::Column::Burnt]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset::Column::Burnt) + .ne(Expr::tbl(asset::Entity, asset::Column::Burnt)), + ) + .add(Expr::tbl(asset::Entity, asset::Column::SlotUpdated).lte(slot_i)), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset.slot_updated", - query.sql - ); - conn.execute(query).await?; + .exec_without_returning(conn) + .await?; Ok(()) } @@ -122,22 +125,33 @@ pub async fn save_v1_asset( ..Default::default() }; - let mut query = asset_authority::Entity::insert(model) + asset_authority::Entity::insert(model) .on_conflict( - OnConflict::columns([asset_authority::Column::AssetId]) + OnConflict::column(asset_authority::Column::AssetId) .update_columns([ asset_authority::Column::Authority, - asset_authority::Column::Seq, asset_authority::Column::SlotUpdated, ]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset_authority::Column::Authority) + .ne(Expr::tbl( + asset_authority::Entity, + asset_authority::Column::Authority, + )), + ) + .add( + Expr::tbl( + asset_authority::Entity, + asset_authority::Column::SlotUpdated, + ) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_authority.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -192,8 +206,7 @@ pub async fn save_v1_asset( raw_symbol: ActiveValue::Set(None), base_info_seq: ActiveValue::Set(Some(0)), }; - - let mut query = asset_data::Entity::insert(asset_data_model) + asset_data::Entity::insert(asset_data_model) .on_conflict( OnConflict::columns([asset_data::Column::Id]) .update_columns([ @@ -207,14 +220,93 @@ pub async fn save_v1_asset( asset_data::Column::RawSymbol, asset_data::Column::BaseInfoSeq, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::ChainDataMutability, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::ChainDataMutability, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::ChainData, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::ChainData, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::MetadataUrl, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::MetadataUrl, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::MetadataMutability, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::MetadataMutability, + )), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset_data::Column::Reindex) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::Reindex, + )), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset_data::Column::RawName) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::RawName, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::RawSymbol, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::RawSymbol, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::BaseInfoSeq, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::BaseInfoSeq, + )), + ), + ) + .add( + Expr::tbl(asset_data::Entity, asset_data::Column::SlotUpdated) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_data.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -346,7 +438,7 @@ pub async fn save_v1_asset( ..Default::default() }; - let mut query = asset::Entity::insert(asset_model) + asset::Entity::insert(asset_model) .on_conflict( OnConflict::columns([asset::Column::Id]) .update_columns([ @@ -371,16 +463,210 @@ pub async fn save_v1_asset( asset::Column::Delegate, asset::Column::SlotUpdatedTokenAccount, ]) + .action_cond_where( + Condition::any() + .add( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::OwnerType, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::OwnerType)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::Supply, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::Supply)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::SupplyMint, + ) + .ne( + Expr::tbl(asset::Entity, asset::Column::SupplyMint), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::SlotUpdatedMintAccount, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::SlotUpdatedMintAccount, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::SpecificationVersion, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::SpecificationVersion, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::SpecificationAssetClass, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::SpecificationAssetClass, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::RoyaltyAmount, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::RoyaltyAmount, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::AssetData, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::AssetData)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCorePlugins, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCorePlugins, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreUnknownPlugins, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreUnknownPlugins, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreCollectionNumMinted, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreCollectionNumMinted, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreCollectionCurrentSize, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreCollectionCurrentSize, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCorePluginsJsonVersion, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCorePluginsJsonVersion, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreExternalPlugins, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreExternalPlugins, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreUnknownExternalPlugins, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreUnknownExternalPlugins, + ), + ), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset::Column::Owner) + .ne(Expr::tbl(asset::Entity, asset::Column::Owner)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::Frozen, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::Frozen)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::Delegate, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::Delegate)), + ), + ) + .add( + Expr::tbl( + asset::Entity, + asset::Column::SlotUpdatedMetadataAccount, + ) + .lte(slot as i64), + ), + ) + .add( + Expr::tbl(asset::Entity, asset::Column::SlotUpdatedMetadataAccount) + .is_null(), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - - query.sql = format!( - "{} WHERE excluded.slot_updated_metadata_account >= asset.slot_updated_metadata_account OR asset.slot_updated_metadata_account IS NULL", - query.sql - ); - - txn.execute(query).await?; + .exec_without_returning(&txn) + .await?; //----------------------- // asset_grouping table @@ -397,7 +683,8 @@ pub async fn save_v1_asset( slot_updated: ActiveValue::Set(Some(slot_i)), ..Default::default() }; - let mut query = asset_grouping::Entity::insert(model) + + asset_grouping::Entity::insert(model) .on_conflict( OnConflict::columns([ asset_grouping::Column::AssetId, @@ -405,18 +692,25 @@ pub async fn save_v1_asset( ]) .update_columns([ asset_grouping::Column::GroupValue, - asset_grouping::Column::Verified, asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, ]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset_grouping::Column::GroupValue) + .ne(Expr::tbl( + asset_grouping::Entity, + asset_grouping::Column::GroupValue, + )), + ) + .add( + Expr::tbl(asset_grouping::Entity, asset_grouping::Column::SlotUpdated) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= asset_grouping.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; } @@ -442,7 +736,7 @@ pub async fn save_v1_asset( .collect::>(); if !creators.is_empty() { - let mut query = asset_creators::Entity::insert_many(creators) + asset_creators::Entity::insert_many(creators) .on_conflict( OnConflict::columns([ asset_creators::Column::AssetId, @@ -451,18 +745,55 @@ pub async fn save_v1_asset( .update_columns([ asset_creators::Column::Creator, asset_creators::Column::Share, - asset_creators::Column::Verified, - asset_creators::Column::Seq, asset_creators::Column::SlotUpdated, ]) + .action_cond_where( + Condition::any() + .add( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Creator, + ) + .ne( + Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Creator, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Share, + ) + .ne( + Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Share, + ), + ), + ), + ) + .add( + Expr::tbl( + asset_creators::Entity, + asset_creators::Column::SlotUpdated, + ) + .lte(slot_i), + ), + ) + .add( + Expr::tbl(asset_creators::Entity, asset_creators::Column::SlotUpdated) + .is_null(), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; } diff --git a/program_transformers/src/token/mod.rs b/program_transformers/src/token/mod.rs index 64a83d300..3b864c04c 100644 --- a/program_transformers/src/token/mod.rs +++ b/program_transformers/src/token/mod.rs @@ -10,8 +10,10 @@ use { blockbuster::programs::token_account::TokenProgramAccount, digital_asset_types::dao::{token_accounts, tokens}, sea_orm::{ - entity::ActiveValue, query::QueryTrait, sea_query::query::OnConflict, ConnectionTrait, - DatabaseConnection, DbBackend, EntityTrait, TransactionTrait, + entity::ActiveValue, + sea_query::query::OnConflict, + sea_query::{Alias, Condition, Expr}, + DatabaseConnection, EntityTrait, TransactionTrait, }, solana_sdk::program_option::COption, spl_token::state::AccountState, @@ -49,9 +51,9 @@ pub async fn handle_token_program_account<'a, 'b>( let txn = db.begin().await?; - let mut query = token_accounts::Entity::insert(model) + token_accounts::Entity::insert(model) .on_conflict( - OnConflict::columns([token_accounts::Column::Pubkey]) + OnConflict::column(token_accounts::Column::Pubkey) .update_columns([ token_accounts::Column::Mint, token_accounts::Column::DelegatedAmount, @@ -60,17 +62,109 @@ pub async fn handle_token_program_account<'a, 'b>( token_accounts::Column::Frozen, token_accounts::Column::TokenProgram, token_accounts::Column::Owner, - token_accounts::Column::CloseAuthority, token_accounts::Column::SlotUpdated, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Mint, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Mint, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::DelegatedAmount, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::DelegatedAmount, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Delegate, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Delegate, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Amount, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Amount, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Frozen, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Frozen, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::TokenProgram, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::TokenProgram, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Owner, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Owner, + ), + ), + ), + ) + .add( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::SlotUpdated, + ) + .lte(account_info.slot as i64), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > token_accounts.slot_updated", - query.sql - ); - txn.execute(query).await?; + .exec_without_returning(&txn) + .await?; if ta.amount == 1 { upsert_assets_token_account_columns( @@ -112,29 +206,83 @@ pub async fn handle_token_program_account<'a, 'b>( let txn = db.begin().await?; - let mut query = tokens::Entity::insert(model) + tokens::Entity::insert(model) .on_conflict( OnConflict::columns([tokens::Column::Mint]) .update_columns([ tokens::Column::Supply, tokens::Column::TokenProgram, tokens::Column::MintAuthority, - tokens::Column::CloseAuthority, - tokens::Column::ExtensionData, tokens::Column::SlotUpdated, tokens::Column::Decimals, tokens::Column::FreezeAuthority, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::Supply, + ) + .ne(Expr::tbl(tokens::Entity, tokens::Column::Supply)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::TokenProgram, + ) + .ne( + Expr::tbl( + tokens::Entity, + tokens::Column::TokenProgram, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::MintAuthority, + ) + .ne( + Expr::tbl( + tokens::Entity, + tokens::Column::MintAuthority, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::Decimals, + ) + .ne( + Expr::tbl(tokens::Entity, tokens::Column::Decimals), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::FreezeAuthority, + ) + .ne( + Expr::tbl( + tokens::Entity, + tokens::Column::FreezeAuthority, + ), + ), + ), + ) + .add( + Expr::tbl(tokens::Entity, tokens::Column::SlotUpdated) + .lte(account_info.slot as i64), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - - query.sql = format!( - "{} WHERE excluded.slot_updated >= tokens.slot_updated", - query.sql - ); - - txn.execute(query).await?; + .exec_without_returning(&txn) + .await?; upsert_assets_mint_account_columns( AssetMintAccountColumns { diff --git a/program_transformers/src/token_metadata/master_edition.rs b/program_transformers/src/token_metadata/master_edition.rs index a95304e5f..df8872dfd 100644 --- a/program_transformers/src/token_metadata/master_edition.rs +++ b/program_transformers/src/token_metadata/master_edition.rs @@ -9,9 +9,8 @@ use { }, sea_orm::{ entity::{ActiveValue, EntityTrait}, - query::QueryTrait, - sea_query::query::OnConflict, - ConnectionTrait, DatabaseTransaction, DbBackend, + sea_query::{query::OnConflict, Alias, Condition, Expr}, + DatabaseTransaction, }, solana_sdk::pubkey::Pubkey, }; @@ -74,7 +73,7 @@ pub async fn save_master_edition( ..Default::default() }; - let query = asset_v1_account_attachments::Entity::insert(model) + asset_v1_account_attachments::Entity::insert(model) .on_conflict( OnConflict::columns([asset_v1_account_attachments::Column::Id]) .update_columns([ @@ -82,9 +81,43 @@ pub async fn save_master_edition( asset_v1_account_attachments::Column::Data, asset_v1_account_attachments::Column::SlotUpdated, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_v1_account_attachments::Column::AttachmentType, + ) + .ne(Expr::tbl( + asset_v1_account_attachments::Entity, + asset_v1_account_attachments::Column::AttachmentType, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_v1_account_attachments::Column::Data, + ) + .ne(Expr::tbl( + asset_v1_account_attachments::Entity, + asset_v1_account_attachments::Column::Data, + )), + ), + ) + .add( + Expr::tbl( + asset_v1_account_attachments::Entity, + asset_v1_account_attachments::Column::SlotUpdated, + ) + .lte(slot as i64), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - txn.execute(query).await?; + .exec_without_returning(txn) + .await?; + Ok(()) } diff --git a/program_transformers/src/token_metadata/v1_asset.rs b/program_transformers/src/token_metadata/v1_asset.rs index 13b6e3c32..c1e8dc5fc 100644 --- a/program_transformers/src/token_metadata/v1_asset.rs +++ b/program_transformers/src/token_metadata/v1_asset.rs @@ -21,9 +21,9 @@ use { }, sea_orm::{ entity::{ActiveValue, EntityTrait}, - query::{JsonValue, QueryTrait}, - sea_query::query::OnConflict, - ConnectionTrait, DbBackend, Statement, TransactionTrait, + query::JsonValue, + sea_query::{query::OnConflict, Alias, Expr}, + Condition, ConnectionTrait, Statement, TransactionTrait, }, solana_sdk::pubkey, tracing::warn, @@ -41,18 +41,24 @@ pub async fn burn_v1_asset( burnt: ActiveValue::Set(true), ..Default::default() }; - let mut query = asset::Entity::insert(model) + + asset::Entity::insert(model) .on_conflict( OnConflict::columns([asset::Column::Id]) .update_columns([asset::Column::SlotUpdated, asset::Column::Burnt]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset::Column::Burnt) + .ne(Expr::tbl(asset::Entity, asset::Column::Burnt)), + ) + .add(Expr::tbl(asset::Entity, asset::Column::SlotUpdated).lte(slot_i)), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset.slot_updated", - query.sql - ); - conn.execute(query).await?; + .exec_without_returning(conn) + .await?; + Ok(()) } @@ -134,7 +140,7 @@ pub async fn save_v1_asset( txn.execute(set_lock_timeout_stmt).await?; txn.execute(set_local_app_name_stmt).await?; - let mut query = asset_data::Entity::insert(asset_data_model) + asset_data::Entity::insert(asset_data_model) .on_conflict( OnConflict::columns([asset_data::Column::Id]) .update_columns([ @@ -148,14 +154,93 @@ pub async fn save_v1_asset( asset_data::Column::RawSymbol, asset_data::Column::BaseInfoSeq, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::ChainDataMutability, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::ChainDataMutability, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::ChainData, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::ChainData, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::MetadataUrl, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::MetadataUrl, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::MetadataMutability, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::MetadataMutability, + )), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset_data::Column::Reindex) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::Reindex, + )), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset_data::Column::RawName) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::RawName, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::RawSymbol, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::RawSymbol, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::BaseInfoSeq, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::BaseInfoSeq, + )), + ), + ) + .add( + Expr::tbl(asset_data::Entity, asset_data::Column::SlotUpdated) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_data.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -184,14 +269,14 @@ pub async fn save_v1_asset( attachment_type: ActiveValue::Set(V1AccountAttachments::MasterEditionV2), ..Default::default() }; - let query = asset_v1_account_attachments::Entity::insert(attachment) + + asset_v1_account_attachments::Entity::insert(attachment) .on_conflict( OnConflict::columns([asset_v1_account_attachments::Column::Id]) .do_nothing() .to_owned(), ) - .build(DbBackend::Postgres); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -202,21 +287,34 @@ pub async fn save_v1_asset( slot_updated: ActiveValue::Set(slot_i), ..Default::default() }; - let mut query = asset_authority::Entity::insert(model) + + asset_authority::Entity::insert(model) .on_conflict( - OnConflict::columns([asset_authority::Column::AssetId]) + OnConflict::column(asset_authority::Column::AssetId) .update_columns([ asset_authority::Column::Authority, asset_authority::Column::SlotUpdated, ]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset_authority::Column::Authority) + .ne(Expr::tbl( + asset_authority::Entity, + asset_authority::Column::Authority, + )), + ) + .add( + Expr::tbl( + asset_authority::Entity, + asset_authority::Column::SlotUpdated, + ) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_authority.slot_updated AND excluded.authority != asset_authority.authority", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -230,7 +328,8 @@ pub async fn save_v1_asset( slot_updated: ActiveValue::Set(Some(slot_i)), ..Default::default() }; - let mut query = asset_grouping::Entity::insert(model) + + asset_grouping::Entity::insert(model) .on_conflict( OnConflict::columns([ asset_grouping::Column::AssetId, @@ -240,16 +339,40 @@ pub async fn save_v1_asset( asset_grouping::Column::GroupValue, asset_grouping::Column::Verified, asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_grouping::Column::GroupValue, + ) + .ne(Expr::tbl( + asset_grouping::Entity, + asset_grouping::Column::GroupValue, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_grouping::Column::Verified, + ) + .ne(Expr::tbl( + asset_grouping::Entity, + asset_grouping::Column::Verified, + )), + ), + ) + .add( + Expr::tbl(asset_grouping::Entity, asset_grouping::Column::SlotUpdated) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_grouping.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; } @@ -272,7 +395,7 @@ pub async fn save_v1_asset( .collect::>(); if !creators.is_empty() { - let mut query = asset_creators::Entity::insert_many(creators) + asset_creators::Entity::insert_many(creators) .on_conflict( OnConflict::columns([ asset_creators::Column::AssetId, @@ -281,18 +404,78 @@ pub async fn save_v1_asset( .update_columns([ asset_creators::Column::Creator, asset_creators::Column::Share, - asset_creators::Column::Verified, asset_creators::Column::Seq, + asset_creators::Column::Verified, asset_creators::Column::SlotUpdated, ]) + .action_cond_where( + Condition::any() + .add( + Condition::all().add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Creator, + ) + .ne(Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Creator, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Share, + ) + .ne(Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Share, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Verified, + ) + .ne(Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Verified, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Seq, + ) + .ne(Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Seq, + )), + ), + ), + ) + .add( + Condition::any() + .add( + Expr::tbl( + asset_creators::Entity, + asset_creators::Column::SlotUpdated, + ) + .is_null(), + ) + .add( + Expr::tbl( + asset_creators::Entity, + asset_creators::Column::SlotUpdated, + ) + .lte(slot_i), + ), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; }