From 03ef0f00d6354bc8585c95d11619e0d7725ec08d Mon Sep 17 00:00:00 2001 From: Andy Kurnia Date: Sun, 21 Apr 2024 22:45:49 +0800 Subject: [PATCH] assign racks to different threads for breadth-first sampling --- src/main_leave.rs | 109 ++++++++++++++++++++++++++++------------------ 1 file changed, 66 insertions(+), 43 deletions(-) diff --git a/src/main_leave.rs b/src/main_leave.rs index 682f951..11057d3 100644 --- a/src/main_leave.rs +++ b/src/main_leave.rs @@ -566,7 +566,10 @@ fn generate_autoplay_logs 0 { - // since the iteration order is deterministic, this can be undone. - generate_exchanges_abortable( - &mut ExchangeAbortableEnv { - found_exchange_move: |rack_bytes: &[u8]| -> bool { - if thread_full_rack_map.len() >= thread_skip { - return false; - } + thread_lo = num_racks / num_threads; // floor division. + thread_hi = (thread_lo * (thread_id + 1)) + + (num_racks - thread_lo * num_threads) * (thread_id + 1) / num_threads; + thread_lo = (thread_lo * thread_id) + + (num_racks - thread_lo * num_threads) * thread_id / num_threads; + // temporarily prefill racks as follows. (these are exclusive ranges.) + // racks 0..thread_lo: big + 1. + // racks thread_lo..thread_hi: 0. + // racks thread_hi..num_racks: big. + // this thread would focud on filling in its assigned region first. + // if it's all unavailable, it would pick the next rack (wraps around). + let mut rack_idx = 0; + generate_exchanges( + &mut ExchangeEnv { + found_exchange_move: |rack_bytes: &[u8]| { + if rack_idx < thread_lo { thread_full_rack_map.insert( rack_bytes.into(), Cumulate { equity: 0.0, - count: 1, + count: big_plus_one, }, ); - true - }, - rack_tally: &mut alphabet_freqs, - min_len: game_config.rack_size(), - max_len: game_config.rack_size(), - exchange_buffer: &mut exchange_buffer, + } else if rack_idx >= thread_hi { + thread_full_rack_map.insert( + rack_bytes.into(), + Cumulate { + equity: 0.0, + count: big, + }, + ); + } + rack_idx += 1; }, - 0, - ); + rack_tally: &mut alphabet_freqs, + min_len: game_config.rack_size(), + max_len: game_config.rack_size(), + exchange_buffer: &mut exchange_buffer, + }, + 0, + ); + let num_thread_hi = num_racks - thread_hi; + num_racks = thread_hi - thread_lo; + if thread_lo > 0 { + thread_sample_count_map.insert(big_plus_one, thread_lo); } - assert_eq!(thread_skip, thread_full_rack_map.len()); - num_racks -= thread_skip; if num_racks > 0 { thread_sample_count_map.insert(0, num_racks); } - if thread_skip > 0 { - thread_sample_count_map.insert(1, thread_skip); + if num_thread_hi > 0 { + thread_sample_count_map.insert(big, num_thread_hi); } } loop { @@ -974,31 +992,36 @@ fn generate_autoplay_logs 0 { - generate_exchanges_abortable( - &mut ExchangeAbortableEnv { - found_exchange_move: |rack_bytes: &[u8]| -> bool { - if thread_skip == 0 { - return false; + let mut rack_idx = 0; + generate_exchanges( + &mut ExchangeEnv { + found_exchange_move: |rack_bytes: &[u8]| { + if rack_idx < thread_lo { + if let std::collections::hash_map::Entry::Occupied(mut entry) = + thread_full_rack_map.entry(rack_bytes.into()) + { + entry.get_mut().count -= big_plus_one; + } else { + unreachable!(); } + } else if rack_idx >= thread_hi { if let std::collections::hash_map::Entry::Occupied(mut entry) = thread_full_rack_map.entry(rack_bytes.into()) { - entry.get_mut().count -= 1; - thread_skip -= 1; + entry.get_mut().count -= big; } else { unreachable!(); } - true - }, - rack_tally: &mut alphabet_freqs, - min_len: game_config.rack_size(), - max_len: game_config.rack_size(), - exchange_buffer: &mut exchange_buffer, + } + rack_idx += 1; }, - 0, - ); - } + rack_tally: &mut alphabet_freqs, + min_len: game_config.rack_size(), + max_len: game_config.rack_size(), + exchange_buffer: &mut exchange_buffer, + }, + 0, + ); } let batched_csv_log_buf = batched_csv_log.into_inner().unwrap();