From 3c71f859e18c50bfdd997871a157b5cf97d45d1f Mon Sep 17 00:00:00 2001 From: Brooks Date: Fri, 10 Nov 2023 14:32:24 -0500 Subject: [PATCH] Uses fold+reduce for handling duplicate pubkeys during index generation (#34011) --- accounts-db/src/accounts_db.rs | 95 +++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 31 deletions(-) diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 6942f9adad19c7..e8435ff2218edb 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -9397,51 +9397,84 @@ impl AccountsDb { ..GenerateIndexTimings::default() }; - // subtract data.len() from accounts_data_len for all old accounts that are in the index twice - let mut accounts_data_len_dedup_timer = - Measure::start("handle accounts data len duplicates"); - let uncleaned_roots = Mutex::new(IntSet::default()); if pass == 0 { - let accounts_data_len_from_duplicates = unique_pubkeys_by_bin + #[derive(Debug, Default)] + struct DuplicatePubkeysVisitedInfo { + accounts_data_len_from_duplicates: u64, + uncleaned_roots: IntSet, + } + impl DuplicatePubkeysVisitedInfo { + fn reduce(mut a: Self, mut b: Self) -> Self { + if a.uncleaned_roots.len() >= b.uncleaned_roots.len() { + a.merge(b); + a + } else { + b.merge(a); + b + } + } + fn merge(&mut self, other: Self) { + self.accounts_data_len_from_duplicates += + other.accounts_data_len_from_duplicates; + self.uncleaned_roots.extend(other.uncleaned_roots); + } + } + + // subtract data.len() from accounts_data_len for all old accounts that are in the index twice + let mut accounts_data_len_dedup_timer = + Measure::start("handle accounts data len duplicates"); + let DuplicatePubkeysVisitedInfo { + accounts_data_len_from_duplicates, + uncleaned_roots, + } = unique_pubkeys_by_bin .par_iter() - .map(|unique_keys| { - unique_keys - .par_chunks(4096) - .map(|pubkeys| { - let (count, uncleaned_roots_this_group) = self - .visit_duplicate_pubkeys_during_startup( - pubkeys, - &rent_collector, - &timings, - ); - let mut uncleaned_roots = uncleaned_roots.lock().unwrap(); - uncleaned_roots_this_group.into_iter().for_each(|slot| { - uncleaned_roots.insert(slot); - }); - count - }) - .sum::() - }) - .sum(); + .fold( + DuplicatePubkeysVisitedInfo::default, + |accum, pubkeys_by_bin| { + let intermediate = pubkeys_by_bin + .par_chunks(4096) + .fold(DuplicatePubkeysVisitedInfo::default, |accum, pubkeys| { + let (accounts_data_len_from_duplicates, uncleaned_roots) = self + .visit_duplicate_pubkeys_during_startup( + pubkeys, + &rent_collector, + &timings, + ); + let intermediate = DuplicatePubkeysVisitedInfo { + accounts_data_len_from_duplicates, + uncleaned_roots, + }; + DuplicatePubkeysVisitedInfo::reduce(accum, intermediate) + }) + .reduce( + DuplicatePubkeysVisitedInfo::default, + DuplicatePubkeysVisitedInfo::reduce, + ); + DuplicatePubkeysVisitedInfo::reduce(accum, intermediate) + }, + ) + .reduce( + DuplicatePubkeysVisitedInfo::default, + DuplicatePubkeysVisitedInfo::reduce, + ); + accounts_data_len_dedup_timer.stop(); + timings.accounts_data_len_dedup_time_us = accounts_data_len_dedup_timer.as_us(); + timings.slots_to_clean = uncleaned_roots.len() as u64; + + self.accounts_index + .add_uncleaned_roots(uncleaned_roots.into_iter()); accounts_data_len.fetch_sub(accounts_data_len_from_duplicates, Ordering::Relaxed); info!( "accounts data len: {}", accounts_data_len.load(Ordering::Relaxed) ); } - accounts_data_len_dedup_timer.stop(); - timings.accounts_data_len_dedup_time_us = accounts_data_len_dedup_timer.as_us(); - - let uncleaned_roots = uncleaned_roots.into_inner().unwrap(); - timings.slots_to_clean = uncleaned_roots.len() as u64; if pass == 0 { // Need to add these last, otherwise older updates will be cleaned for root in &slots { self.accounts_index.add_root(*root); } - self.accounts_index - .add_uncleaned_roots(uncleaned_roots.into_iter()); self.set_storage_count_and_alive_bytes(storage_info, &mut timings); }