diff --git a/program_transformers/src/asset_upserts.rs b/program_transformers/src/asset_upserts.rs index 680ee50ea..a8e155031 100644 --- a/program_transformers/src/asset_upserts.rs +++ b/program_transformers/src/asset_upserts.rs @@ -7,7 +7,7 @@ use { }, sea_orm::{ sea_query::{Alias, Condition, Expr, OnConflict}, - ConnectionTrait, DbBackend, DbErr, EntityTrait, QueryTrait, Set, TransactionTrait, + ConnectionTrait, DbErr, EntityTrait, Set, TransactionTrait, }, serde_json::value::Value, sqlx::types::Decimal, @@ -138,7 +138,7 @@ pub async fn upsert_assets_mint_account_columns= asset.slot_updated_mint_account OR asset.slot_updated_mint_account IS NULL", - query.sql); - txn_or_conn.execute(query).await?; Ok(()) } @@ -204,7 +241,7 @@ pub async fn upsert_assets_metadata_account_columns= asset.slot_updated_metadata_account OR asset.slot_updated_metadata_account IS NULL", - query.sql); - txn_or_conn.execute(query).await?; + .exec_without_returning(txn_or_conn) + .await?; Ok(()) } diff --git a/program_transformers/src/mpl_core_program/v1_asset.rs b/program_transformers/src/mpl_core_program/v1_asset.rs index a1dab3369..8886e5bb8 100644 --- a/program_transformers/src/mpl_core_program/v1_asset.rs +++ b/program_transformers/src/mpl_core_program/v1_asset.rs @@ -21,10 +21,9 @@ use { sea_orm::{ entity::{ActiveValue, ColumnTrait, EntityTrait}, prelude::*, - query::{JsonValue, QueryFilter, QueryTrait}, - sea_query::query::OnConflict, - sea_query::Expr, - ConnectionTrait, CursorTrait, DbBackend, Statement, TransactionTrait, + query::{JsonValue, QueryFilter}, + sea_query::{query::OnConflict, Alias, Condition, Expr}, + ConnectionTrait, CursorTrait, Statement, TransactionTrait, }, serde_json::{value::Value, Map}, solana_sdk::pubkey::Pubkey, @@ -43,18 +42,22 @@ pub async fn burn_v1_asset( burnt: ActiveValue::Set(true), ..Default::default() }; - let mut query = asset::Entity::insert(model) + asset::Entity::insert(model) .on_conflict( - OnConflict::columns([asset::Column::Id]) + OnConflict::column(asset::Column::Id) .update_columns([asset::Column::SlotUpdated, asset::Column::Burnt]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset::Column::Burnt) + .ne(Expr::tbl(asset::Entity, asset::Column::Burnt)), + ) + .add(Expr::tbl(asset::Entity, asset::Column::SlotUpdated).lte(slot_i)), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset.slot_updated", - query.sql - ); - conn.execute(query).await?; + .exec_without_returning(conn) + .await?; Ok(()) } @@ -122,22 +125,33 @@ pub async fn save_v1_asset( ..Default::default() }; - let mut query = asset_authority::Entity::insert(model) + asset_authority::Entity::insert(model) .on_conflict( - OnConflict::columns([asset_authority::Column::AssetId]) + OnConflict::column(asset_authority::Column::AssetId) .update_columns([ asset_authority::Column::Authority, - asset_authority::Column::Seq, asset_authority::Column::SlotUpdated, ]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset_authority::Column::Authority) + .ne(Expr::tbl( + asset_authority::Entity, + asset_authority::Column::Authority, + )), + ) + .add( + Expr::tbl( + asset_authority::Entity, + asset_authority::Column::SlotUpdated, + ) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_authority.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -192,8 +206,7 @@ pub async fn save_v1_asset( raw_symbol: ActiveValue::Set(None), base_info_seq: ActiveValue::Set(Some(0)), }; - - let mut query = asset_data::Entity::insert(asset_data_model) + asset_data::Entity::insert(asset_data_model) .on_conflict( OnConflict::columns([asset_data::Column::Id]) .update_columns([ @@ -207,14 +220,93 @@ pub async fn save_v1_asset( asset_data::Column::RawSymbol, asset_data::Column::BaseInfoSeq, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::ChainDataMutability, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::ChainDataMutability, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::ChainData, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::ChainData, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::MetadataUrl, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::MetadataUrl, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::MetadataMutability, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::MetadataMutability, + )), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset_data::Column::Reindex) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::Reindex, + )), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset_data::Column::RawName) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::RawName, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::RawSymbol, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::RawSymbol, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::BaseInfoSeq, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::BaseInfoSeq, + )), + ), + ) + .add( + Expr::tbl(asset_data::Entity, asset_data::Column::SlotUpdated) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_data.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -346,7 +438,7 @@ pub async fn save_v1_asset( ..Default::default() }; - let mut query = asset::Entity::insert(asset_model) + asset::Entity::insert(asset_model) .on_conflict( OnConflict::columns([asset::Column::Id]) .update_columns([ @@ -371,16 +463,210 @@ pub async fn save_v1_asset( asset::Column::Delegate, asset::Column::SlotUpdatedTokenAccount, ]) + .action_cond_where( + Condition::any() + .add( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::OwnerType, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::OwnerType)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::Supply, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::Supply)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::SupplyMint, + ) + .ne( + Expr::tbl(asset::Entity, asset::Column::SupplyMint), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::SlotUpdatedMintAccount, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::SlotUpdatedMintAccount, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::SpecificationVersion, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::SpecificationVersion, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::SpecificationAssetClass, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::SpecificationAssetClass, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::RoyaltyAmount, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::RoyaltyAmount, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::AssetData, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::AssetData)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCorePlugins, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCorePlugins, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreUnknownPlugins, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreUnknownPlugins, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreCollectionNumMinted, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreCollectionNumMinted, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreCollectionCurrentSize, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreCollectionCurrentSize, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCorePluginsJsonVersion, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCorePluginsJsonVersion, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreExternalPlugins, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreExternalPlugins, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::MplCoreUnknownExternalPlugins, + ) + .ne( + Expr::tbl( + asset::Entity, + asset::Column::MplCoreUnknownExternalPlugins, + ), + ), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset::Column::Owner) + .ne(Expr::tbl(asset::Entity, asset::Column::Owner)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::Frozen, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::Frozen)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset::Column::Delegate, + ) + .ne(Expr::tbl(asset::Entity, asset::Column::Delegate)), + ), + ) + .add( + Expr::tbl( + asset::Entity, + asset::Column::SlotUpdatedMetadataAccount, + ) + .lte(slot as i64), + ), + ) + .add( + Expr::tbl(asset::Entity, asset::Column::SlotUpdatedMetadataAccount) + .is_null(), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - - query.sql = format!( - "{} WHERE excluded.slot_updated_metadata_account >= asset.slot_updated_metadata_account OR asset.slot_updated_metadata_account IS NULL", - query.sql - ); - - txn.execute(query).await?; + .exec_without_returning(&txn) + .await?; //----------------------- // asset_grouping table @@ -397,7 +683,8 @@ pub async fn save_v1_asset( slot_updated: ActiveValue::Set(Some(slot_i)), ..Default::default() }; - let mut query = asset_grouping::Entity::insert(model) + + asset_grouping::Entity::insert(model) .on_conflict( OnConflict::columns([ asset_grouping::Column::AssetId, @@ -405,18 +692,25 @@ pub async fn save_v1_asset( ]) .update_columns([ asset_grouping::Column::GroupValue, - asset_grouping::Column::Verified, asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, ]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset_grouping::Column::GroupValue) + .ne(Expr::tbl( + asset_grouping::Entity, + asset_grouping::Column::GroupValue, + )), + ) + .add( + Expr::tbl(asset_grouping::Entity, asset_grouping::Column::SlotUpdated) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= asset_grouping.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; } @@ -442,7 +736,7 @@ pub async fn save_v1_asset( .collect::>(); if !creators.is_empty() { - let mut query = asset_creators::Entity::insert_many(creators) + asset_creators::Entity::insert_many(creators) .on_conflict( OnConflict::columns([ asset_creators::Column::AssetId, @@ -451,18 +745,55 @@ pub async fn save_v1_asset( .update_columns([ asset_creators::Column::Creator, asset_creators::Column::Share, - asset_creators::Column::Verified, - asset_creators::Column::Seq, asset_creators::Column::SlotUpdated, ]) + .action_cond_where( + Condition::any() + .add( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Creator, + ) + .ne( + Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Creator, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Share, + ) + .ne( + Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Share, + ), + ), + ), + ) + .add( + Expr::tbl( + asset_creators::Entity, + asset_creators::Column::SlotUpdated, + ) + .lte(slot_i), + ), + ) + .add( + Expr::tbl(asset_creators::Entity, asset_creators::Column::SlotUpdated) + .is_null(), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; } diff --git a/program_transformers/src/token_metadata/master_edition.rs b/program_transformers/src/token_metadata/master_edition.rs index a617e7c2f..df8872dfd 100644 --- a/program_transformers/src/token_metadata/master_edition.rs +++ b/program_transformers/src/token_metadata/master_edition.rs @@ -84,24 +84,27 @@ pub async fn save_master_edition( .action_cond_where( Condition::all() .add( - Expr::tbl( - Alias::new("excluded"), - asset_v1_account_attachments::Column::AttachmentType, - ) - .ne(Expr::tbl( - asset_v1_account_attachments::Entity, - asset_v1_account_attachments::Column::AttachmentType, - )), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_v1_account_attachments::Column::Data, - ) - .ne(Expr::tbl( - asset_v1_account_attachments::Entity, - asset_v1_account_attachments::Column::Data, - )), + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_v1_account_attachments::Column::AttachmentType, + ) + .ne(Expr::tbl( + asset_v1_account_attachments::Entity, + asset_v1_account_attachments::Column::AttachmentType, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_v1_account_attachments::Column::Data, + ) + .ne(Expr::tbl( + asset_v1_account_attachments::Entity, + asset_v1_account_attachments::Column::Data, + )), + ), ) .add( Expr::tbl( @@ -114,7 +117,7 @@ pub async fn save_master_edition( .to_owned(), ) .exec_without_returning(txn) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + .await?; + Ok(()) }