Skip to content

Commit

Permalink
check with u32 + dump to csv
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Nov 19, 2023
1 parent e4fe1b4 commit 9728eef
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 27 deletions.
2 changes: 2 additions & 0 deletions lite-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@ solana-lite-rpc-history = { workspace = true }

[dev-dependencies]
bench = { path = "../bench" }
fxhash = "0.2.1"
csv = "1.3.0"
96 changes: 69 additions & 27 deletions lite-rpc/tests/storage_integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::backtrace::Backtrace;
use std::cell::OnceCell;
use std::collections::{HashMap, HashSet};
use std::hash::Hasher;
use std::panic::PanicInfo;
use std::process;
use std::str::FromStr;
Expand All @@ -16,12 +17,16 @@ use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockSt
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use std::sync::Arc;
use std::time::{Duration, Instant};
use chrono::format::format_item;
use csv::WriterBuilder;
use fxhash::FxHasher32;
use serde::{Deserialize, Serialize};
use solana_sdk::blake3::{hash, Hash, HASH_BYTES};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::message::VersionedMessage;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::shred_version::compute_shred_version;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle;
use tokio::time::sleep;
Expand Down Expand Up @@ -58,7 +63,7 @@ async fn storage_test() {
drop(blocks_notifier);

info!("Run tests for some time ...");
sleep(Duration::from_secs(20)).await;
sleep(Duration::from_secs(1200)).await;

// jh1.abort();
// jh2.abort();
Expand Down Expand Up @@ -132,18 +137,28 @@ struct BlockDebugDetails {
}


#[derive(serde::Serialize)]
struct CsvRow<'a> {
account_pubkey: &'a str,
hash: u32,
hash2: u32,
}

fn compress_account_ids(block_notifier: BlockStream) -> JoinHandle<()> {
tokio::spawn(async move {
let mut block_notifier = block_notifier;

// 1 GB RAM for the two arrays
let mut collisions = vec![0_u8; u32::MAX as usize / 8];
let mut collisions2 = vec![0_u8; u32::MAX as usize / 8];

info!("ALLOCATED ALOT OF MEM");

const USE_FULL_COLLISSION_MAP: bool = false;
let mut seen = HashMap::<u32, Pubkey>::new();
let mut count = 0;

let mut csv_writer = WriterBuilder::new().from_path(format!("collissions.csv")).unwrap();
let mut slots = HashSet::new();

loop {
match block_notifier.recv().await {
Expand All @@ -155,23 +170,33 @@ fn compress_account_ids(block_notifier: BlockStream) -> JoinHandle<()> {
block.transactions.len()
);

slots.insert(block.slot);

let started = Instant::now();
let mut count = 0;
for tx in block.transactions {
// info!("tx {}", tx.signature);

for acc in tx.static_account_keys {
// info!("- {}", acc);
let hash: u32 = hash32(acc);
let hash2: u32 = hash32_check(acc);
let hash: u32 = hash32(&acc);
let hash2: u32 = hash32_check(&acc);


let ptr = &mut collisions[hash as usize / 8];
csv_writer.serialize(CsvRow {
account_pubkey: acc.to_string().as_str(),
hash,
hash2,
}).unwrap();

if *ptr & (1 << (hash % 8)) != 0 {
let hash_used_before = collisions[hash as usize / 8] & (1 << (hash % 8)) != 0;
if hash_used_before {
if collisions2[hash2 as usize / 8] & (1 << (hash2 % 8)) == 0 {
info!("optimistic collision check for {}", acc);
// panic!("collision for {}", acc);
info!("slots mapped without collision: {:?}", slots.len());
info!("slots mapped without collision: {:?}", slots);
csv_writer.flush().unwrap();
panic!("collision for {}", acc);
}
}

Expand All @@ -185,7 +210,7 @@ fn compress_account_ids(block_notifier: BlockStream) -> JoinHandle<()> {
}

collisions2[hash2 as usize / 8] |= 1 << (hash2 % 8);
*ptr |= 1 << (hash % 8);
collisions[hash as usize / 8] |= 1 << (hash % 8);


// failes with 436/524 (at 16 bit)
Expand All @@ -194,12 +219,14 @@ fn compress_account_ids(block_notifier: BlockStream) -> JoinHandle<()> {
// info!("ones2: {}", count_ones(collisions2.as_ref()));
// }

count += 1;

}
} // -- all txs in block

info!("hashing took {}ms", started.elapsed().as_secs_f64() * 1000.0);
info!("hashing took {:.2}ms for {} keys", started.elapsed().as_secs_f64() * 1000.0, count);
// info!("ones: {}", count_ones(collisions.as_ref()));

count += 1;
} // -- Ok
Err(RecvError::Lagged(missed_blocks)) => {
warn!(
Expand All @@ -217,28 +244,30 @@ fn compress_account_ids(block_notifier: BlockStream) -> JoinHandle<()> {
})
}

fn hash16(p0: Pubkey) -> u16 {
let hash1: Hash = hash(p0.as_ref());
hash1.to_bytes()[0] as u16 + (hash1.to_bytes()[1] as u16) * 256
#[inline]
fn hash16(p0: &Pubkey) -> u16 {
fxhash::hash32(p0.as_ref()) as u16
}

fn hash16_check(p0: Pubkey) -> u16 {
let hash1: Hash = hash(p0.as_ref());
hash1.to_bytes()[HASH_BYTES - 1] as u16 + (hash1.to_bytes()[HASH_BYTES - 2] as u16) * 256
#[inline]
fn hash16_check(p0: &Pubkey) -> u16 {
fxhash::hash32(p0.as_ref()) as u16
}

fn hash32(p0: Pubkey) -> u32 {
let hash1: Hash = hash(p0.as_ref());
let mut dst = [0u8; 4];
dst.clone_from_slice(&hash1.to_bytes()[0..4]);
u32::from_be_bytes(dst)
#[inline]
fn hash32(p0: &Pubkey) -> u32 {
let mut hasher = FxHasher32::default();
hasher.write_u64(0x9911223344001133);
hasher.write(p0.as_ref());
hasher.finish() as u32
}

fn hash32_check(p0: Pubkey) -> u32 {
let hash1: Hash = hash(p0.as_ref());
let mut dst = [0u8; 4];
dst.clone_from_slice(&hash1.to_bytes()[4..8]);
u32::from_be_bytes(dst)
#[inline]
fn hash32_check(p0: &Pubkey) -> u32 {
let mut hasher = FxHasher32::default();
hasher.write_u64(0x2348012701123212);
hasher.write(p0.as_ref());
hasher.finish() as u32
}

fn count_ones(data: &[u8]) -> u32 {
Expand All @@ -254,9 +283,22 @@ fn count_ones(data: &[u8]) -> u32 {
pub fn hash_to_16bit() {
let account_key = Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ").unwrap();

assert_eq!(28038u16, hash16(account_key));
assert_eq!(28038u16, hash16(&account_key));

}

#[test]
pub fn hash_to_32bit() {
let account_key = Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ").unwrap();

assert_eq!(1779384094, hash32(&account_key));
assert_eq!(1260434961u32, hash32_check(&account_key));

}

#[test]
fn cast16() {
assert_eq!(34464, 100000u32 as u16);
}

fn block_debug_listen(block_notifier: BlockStream) -> JoinHandle<()> {
Expand Down

0 comments on commit 9728eef

Please sign in to comment.