Skip to content

Commit

Permalink
refactor(statedb): refactor utxo:inscription_obj_id's map updates (#2016
Browse files Browse the repository at this point in the history
)

Refactor sled database usage in genesis_ord.rs:

1. The changes simplify the sled database usage in the genesis_ord.rs file. Temporary databases for utxo_ord_map and id_ord_map are created on the fly instead of from a static path, eliminating the need for cleanup after use. Also, steps are taken to avoid redundant serialization of the same data.
2. introduce a merge operation on database maps to improve key-value update mechanism. Furthermore, the batch size parameters for data import have been moved within a clearer context.
  • Loading branch information
popcnt1 authored Jun 27, 2024
1 parent 97c4357 commit 11695a4
Showing 1 changed file with 55 additions and 54 deletions.
109 changes: 55 additions & 54 deletions crates/rooch/src/commands/statedb/commands/genesis_ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::SystemTime;
use std::{fs, thread};

use anyhow::Result;
use bitcoin::hashes::Hash;
Expand All @@ -18,7 +18,6 @@ use bitcoin_move::natives::ord::inscription_id::InscriptionId;
use clap::Parser;
use move_core_types::account_address::AccountAddress;
use serde::{Deserialize, Serialize};
use sled::Db;
use tempfile::TempDir;

use moveos_store::MoveOSStore;
Expand Down Expand Up @@ -74,8 +73,8 @@ pub struct InscriptionSource {
pub parent: Option<Vec<InscriptionId>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pointer: Option<u64>,
pub is_p2pk: bool, // If true, address field is script
pub address: String,
pub is_p2pk: bool, // If true, address field is script
pub address: String, // <address>, "unbound", "non-standard", <script(p2pk)>
#[serde(skip_serializing_if = "Option::is_none")]
pub rune: Option<u128>,
}
Expand All @@ -87,14 +86,24 @@ pub struct GenesisOrdCommand {
/// utxo source data file. like ~/.rooch/local/utxo.csv or utxo.csv
/// The file format is csv, and the first line is the header, the header is as follows:
/// count,txid,vout,height,coinbase,amount,script,type,address
pub utxo_input: PathBuf,
pub utxo_source: PathBuf,
#[clap(long)]
/// ord source data file. like ~/.rooch/local/ord or ord
/// The file format is json, and the first line is block height info: # export at block height <N>
/// ord range: [0, N).
/// ord source data file. like ~/.rooch/local/ord or ord, ord_input must be sorted by sequence_number
/// The file format is json, and the first line is block height info: # export at block height <N>, ord range: [0, N).
/// ord_input & utxo_input must be in the same height
/// ord_input must be sorted by sequence_number
pub ord_input: PathBuf,
pub ord_source: PathBuf,
#[clap(
long,
default_value = "2097152",
help = "batch size submited to state db, default 2M. Set it smaller if memory is limited."
)]
pub utxo_batch_size: Option<usize>,
#[clap(
long,
default_value = "1048576",
help = "batch size submited to state db, default 1M. Set it smaller if memory is limited."
)] // ord may have large body, so set a smaller batch
pub ord_batch_size: Option<usize>,

#[clap(long = "data-dir", short = 'd')]
/// Path to data dir, this dir is base dir, the final data_dir is base_dir/chain_network_name
Expand All @@ -105,11 +114,6 @@ pub struct GenesisOrdCommand {
#[clap(long, short = 'n', help = R_OPT_NET_HELP)]
pub chain_id: Option<RoochChainID>,

#[clap(long, default_value = "2097152")]
pub utxo_batch_size: Option<usize>,
#[clap(long, default_value = "1048576")] // ord may have large body, so set a smaller batch
pub ord_batch_size: Option<usize>,

#[clap(flatten)]
pub context_options: WalletContextOptions,
}
Expand All @@ -122,23 +126,26 @@ impl GenesisOrdCommand {
// 5. print job stats, clean env
pub async fn execute(self) -> RoochResult<()> {
// 1. init import job
let utxo_ord_map_db_path = TempDir::new().unwrap().into_path();
let id_ord_map_db_path = TempDir::new().unwrap().into_path();
let (root, moveos_store, start_time) =
init_genesis_job(self.base_data_dir.clone(), self.chain_id.clone());
let pre_root_state_root = H256::from(root.state_root.into_bytes());
let utxo_ord_map = Arc::new(sled::open(utxo_ord_map_db_path.clone()).unwrap());
let db_config = sled::Config::new()
.temporary(true)
.path(TempDir::new().unwrap());
let utxo_ord_map_db = db_config.open().unwrap();
utxo_ord_map_db.set_merge_operator(concatenate_object_id_merge);
let utxo_ord_map = Arc::new(utxo_ord_map_db);

let moveos_store = Arc::new(moveos_store);
let startup_update_set = UpdateSet::new();

let utxo_input_path = self.utxo_input.clone();
let utxo_input_path = self.utxo_source.clone();
let utxo_batch_size = self.utxo_batch_size.unwrap();

// 2. import od
self.import_ord(
utxo_ord_map.clone(),
moveos_store.clone(),
id_ord_map_db_path.clone(),
startup_update_set.clone(),
);

Expand All @@ -153,11 +160,6 @@ impl GenesisOrdCommand {
pre_root_state_root,
start_time,
);
let utxo_ord_map_db = utxo_ord_map.clone();
drop(utxo_ord_map_db);

fs::remove_dir_all(utxo_ord_map_db_path.clone())?;
fs::remove_dir_all(id_ord_map_db_path.clone())?;

Ok(())
}
Expand All @@ -166,22 +168,14 @@ impl GenesisOrdCommand {
self,
utxo_ord_map: Arc<sled::Db>,
moveos_store: Arc<MoveOSStore>,
id_ord_map_db_path: PathBuf,
startup_update_set: UpdateSet<KeyState, State>,
) {
let input_path = self.ord_input.clone();
let input_path = self.ord_source.clone();
let batch_size = self.ord_batch_size.unwrap();

let (tx, rx) = mpsc::sync_channel(2);
let produce_updates_thread = thread::spawn(move || {
produce_ord_updates(
tx,
input_path,
batch_size,
utxo_ord_map,
id_ord_map_db_path.clone(),
)
});
let produce_updates_thread =
thread::spawn(move || produce_ord_updates(tx, input_path, batch_size, utxo_ord_map));
let apply_updates_thread = thread::spawn(move || {
apply_ord_updates_to_state(rx, moveos_store, startup_update_set);
});
Expand Down Expand Up @@ -311,9 +305,12 @@ fn produce_ord_updates(
input: PathBuf,
batch_size: usize,
utxo_ord_map: Arc<sled::Db>,
id_ord_map_db_path: PathBuf,
) {
let id_ord_map: Db = sled::open(id_ord_map_db_path).unwrap();
let db_config = sled::Config::new()
.temporary(true)
.path(TempDir::new().unwrap());
let id_ord_map = db_config.open().unwrap();

let mut reader = BufReader::new(File::open(input).unwrap());
let mut is_title_line = true;
loop {
Expand Down Expand Up @@ -453,32 +450,36 @@ fn update_ord_map(
outpoint: OutPoint,
obj_id: ObjectID,
) -> bool {
let obj_id_bytes = bcs::to_bytes(&obj_id).unwrap();
// update inscription_id:ord
let id_key = bcs::to_bytes(&id).unwrap();
id_ord_map
.insert(id_key, bcs::to_bytes(&obj_id).unwrap())
.unwrap();
id_ord_map.insert(id_key, obj_id_bytes.clone()).unwrap();

let is_unbound = outpoint.txid == Hash::all_zeros() && outpoint.vout == 0;
if is_unbound {
return is_unbound; // unbound has no output
return is_unbound; // unbound has no outpoint
}

// update outpoint:ord
let key = bcs::to_bytes(&outpoint).unwrap();
let value = utxo_ord_map.get(&key).unwrap();
if let Some(value) = value {
let mut ord_ids: Vec<ObjectID> = bcs::from_bytes(&value).unwrap();
ord_ids.push(obj_id);
utxo_ord_map
.insert(&key, bcs::to_bytes(&ord_ids).unwrap())
.unwrap();
} else {
utxo_ord_map
.insert(&key, bcs::to_bytes(&vec![obj_id]).unwrap())
.unwrap();
}
utxo_ord_map.merge(key, obj_id_bytes).unwrap();

is_unbound
}

fn concatenate_object_id_merge(
_key: &[u8], // the key being merged
old_value: Option<&[u8]>, // the previous value, if one existed
merged_bytes: &[u8], // the new bytes being merged in
) -> Option<Vec<u8>> {
// set the new value, return None to delete
let mut ret = old_value.map(|ov| ov.to_vec()).unwrap_or_default();

ret.extend_from_slice(merged_bytes);

Some(ret)
}

fn get_ords_by_ids(id_ord_map: &sled::Db, ids: Option<Vec<InscriptionId>>) -> Vec<ObjectID> {
if let Some(ids) = ids {
let mut obj_ids = Vec::new();
Expand Down

0 comments on commit 11695a4

Please sign in to comment.