From e83abbfe4a47379710b3efb42daeb5b6bce69014 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 17 Apr 2023 15:07:33 +0800 Subject: [PATCH] perf: faster term hash map (#1940) * add term hashmap benchmark * refactor arena hashmap add inlines remove occupied array and use table_entry.is_empty instead (saves 4 bytes per entry) reduce saturation threshold from 1/3 to 1/2 to reduce memory use u32 for UnorderedId (we have the 4billion limit anyways on the Columnar stuff) fix naming LinearProbing remove byteorder dependency memory consumption went down from 2Gb to 1.8GB on indexing wikipedia dataset in tantivy * Update stacker/src/arena_hashmap.rs Co-authored-by: Paul Masurel --------- Co-authored-by: Paul Masurel --- src/indexer/segment_writer.rs | 6 +- src/postings/mod.rs | 2 +- stacker/Cargo.toml | 16 +++++- stacker/benches/crit_bench.rs | 70 +++++++++++++++++++++++ stacker/example/hashmap.rs | 27 +++++++++ stacker/src/arena_hashmap.rs | 104 ++++++++++++++++++++-------------- stacker/src/lib.rs | 4 +- stacker/src/memory_arena.rs | 19 ++++++- 8 files changed, 195 insertions(+), 53 deletions(-) create mode 100644 stacker/benches/crit_bench.rs create mode 100644 stacker/example/hashmap.rs diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index f97908cff..82e786b48 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -9,7 +9,7 @@ use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; use crate::indexer::segment_serializer::SegmentSerializer; use crate::postings::{ - compute_table_size, serialize_postings, IndexingContext, IndexingPosition, + compute_table_memory_size, serialize_postings, IndexingContext, IndexingPosition, PerFieldPostingsWriter, PostingsWriter, }; use crate::schema::{FieldEntry, FieldType, Schema, Term, Value}; @@ -26,7 +26,7 @@ fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result< let table_memory_upper_bound = per_thread_memory_budget / 3; (10..20) // We cap it at 2^19 = 512K capacity. .map(|power| 1 << power) - .take_while(|capacity| compute_table_size(*capacity) < table_memory_upper_bound) + .take_while(|capacity| compute_table_memory_size(*capacity) < table_memory_upper_bound) .last() .ok_or_else(|| { crate::TantivyError::InvalidArgument(format!( @@ -455,7 +455,7 @@ mod tests { fn test_hashmap_size() { assert_eq!(compute_initial_table_size(100_000).unwrap(), 1 << 11); assert_eq!(compute_initial_table_size(1_000_000).unwrap(), 1 << 14); - assert_eq!(compute_initial_table_size(10_000_000).unwrap(), 1 << 17); + assert_eq!(compute_initial_table_size(10_000_000).unwrap(), 1 << 18); assert_eq!(compute_initial_table_size(1_000_000_000).unwrap(), 1 << 19); assert_eq!(compute_initial_table_size(4_000_000_000).unwrap(), 1 << 19); } diff --git a/src/postings/mod.rs b/src/postings/mod.rs index aa24b4ca7..e323a29f8 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -17,7 +17,7 @@ mod serializer; mod skip; mod term_info; -pub(crate) use stacker::compute_table_size; +pub(crate) use stacker::compute_table_memory_size; pub use self::block_segment_postings::BlockSegmentPostings; pub(crate) use self::indexing_context::IndexingContext; diff --git a/stacker/Cargo.toml b/stacker/Cargo.toml index 3c51ccbaa..51b7d2350 100644 --- a/stacker/Cargo.toml +++ b/stacker/Cargo.toml @@ -6,5 +6,19 @@ license = "MIT" [dependencies] murmurhash32 = "0.3" -byteorder = "1" common = { version = "0.5", path = "../common/", package = "tantivy-common" } +criterion = "0.4.0" + +[[bench]] +harness = false +name = "crit_bench" +path = "benches/crit_bench.rs" + +[[example]] +name = "hashmap" +path = "example/hashmap.rs" + +[dev-dependencies] +rand = "0.8.5" +zipf = "7.0.0" + diff --git a/stacker/benches/crit_bench.rs b/stacker/benches/crit_bench.rs new file mode 100644 index 000000000..1ffe47688 --- /dev/null +++ b/stacker/benches/crit_bench.rs @@ -0,0 +1,70 @@ +#![allow(dead_code)] +extern crate criterion; + +use criterion::*; +use rand::SeedableRng; +use tantivy_stacker::ArenaHashMap; + +const ALICE: &str = include_str!("../../benches/alice.txt"); + +fn bench_hashmap_throughput(c: &mut Criterion) { + let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Linear); + + let mut group = c.benchmark_group("CreateHashMap"); + group.plot_config(plot_config); + + let input_name = "alice"; + let input_bytes = ALICE.len() as u64; + group.throughput(Throughput::Bytes(input_bytes)); + + group.bench_with_input( + BenchmarkId::new(input_name.to_string(), input_bytes), + &ALICE, + |b, i| b.iter(|| create_hash_map(i.split_whitespace().map(|el| el.as_bytes()))), + ); + // numbers + let input_bytes = 1_000_000 * 8 as u64; + group.throughput(Throughput::Bytes(input_bytes)); + + group.bench_with_input( + BenchmarkId::new("numbers".to_string(), input_bytes), + &(0..1_000_000u64), + |b, i| b.iter(|| create_hash_map(i.clone().map(|el| el.to_le_bytes()))), + ); + + // numbers zipf + use rand::distributions::Distribution; + use rand::rngs::StdRng; + let mut rng = StdRng::from_seed([3u8; 32]); + let zipf = zipf::ZipfDistribution::new(10_000, 1.03).unwrap(); + + let input_bytes = 1_000_000 * 8 as u64; + group.throughput(Throughput::Bytes(input_bytes)); + + group.bench_with_input( + BenchmarkId::new("numbers_zipf".to_string(), input_bytes), + &(0..1_000_000u64), + |b, i| b.iter(|| create_hash_map(i.clone().map(|_el| zipf.sample(&mut rng).to_le_bytes()))), + ); + + group.finish(); +} + +fn create_hash_map<'a, T: AsRef<[u8]>>(terms: impl Iterator) -> ArenaHashMap { + let mut map = ArenaHashMap::with_capacity(4); + for term in terms { + map.mutate_or_create(term.as_ref(), |val| { + if let Some(mut val) = val { + val += 1; + val + } else { + 1u64 + } + }); + } + + map +} + +criterion_group!(block_benches, bench_hashmap_throughput,); +criterion_main!(block_benches); diff --git a/stacker/example/hashmap.rs b/stacker/example/hashmap.rs new file mode 100644 index 000000000..568392cda --- /dev/null +++ b/stacker/example/hashmap.rs @@ -0,0 +1,27 @@ +use tantivy_stacker::ArenaHashMap; + +const ALICE: &str = include_str!("../../benches/alice.txt"); + +fn main() { + create_hash_map((0..100_000_000).map(|el| el.to_string())); + + for _ in 0..1000 { + create_hash_map(ALICE.split_whitespace()); + } +} + +fn create_hash_map<'a, T: AsRef>(terms: impl Iterator) -> ArenaHashMap { + let mut map = ArenaHashMap::with_capacity(4); + for term in terms { + map.mutate_or_create(term.as_ref().as_bytes(), |val| { + if let Some(mut val) = val { + val += 1; + val + } else { + 1u64 + } + }); + } + + map +} diff --git a/stacker/src/arena_hashmap.rs b/stacker/src/arena_hashmap.rs index 91695d178..43bf37c84 100644 --- a/stacker/src/arena_hashmap.rs +++ b/stacker/src/arena_hashmap.rs @@ -1,7 +1,5 @@ -use std::{iter, mem, slice}; - -use byteorder::{ByteOrder, NativeEndian}; -use murmurhash32::murmurhash2; +use std::iter::{Cloned, Filter}; +use std::mem; use super::{Addr, MemoryArena}; use crate::memory_arena::store; @@ -10,17 +8,19 @@ use crate::UnorderedId; /// Returns the actual memory size in bytes /// required to create a table with a given capacity. /// required to create a table of size -pub fn compute_table_size(capacity: usize) -> usize { +pub fn compute_table_memory_size(capacity: usize) -> usize { capacity * mem::size_of::() } +type HashType = u32; + /// `KeyValue` is the item stored in the hash table. /// The key is actually a `BytesRef` object stored in an external memory arena. /// The `value_addr` also points to an address in the memory arena. #[derive(Copy, Clone)] struct KeyValue { key_value_addr: Addr, - hash: u32, + hash: HashType, unordered_id: UnorderedId, } @@ -28,16 +28,21 @@ impl Default for KeyValue { fn default() -> Self { KeyValue { key_value_addr: Addr::null_pointer(), - hash: 0u32, + hash: 0, unordered_id: UnorderedId::default(), } } } impl KeyValue { + #[inline] fn is_empty(self) -> bool { self.key_value_addr.is_null() } + #[inline] + fn is_not_empty_ref(&self) -> bool { + !self.key_value_addr.is_null() + } } /// Customized `HashMap` with `&[u8]` keys @@ -50,43 +55,47 @@ impl KeyValue { /// the computation of the hash of the key twice, /// or copying the key as long as there is no insert. pub struct ArenaHashMap { - table: Box<[KeyValue]>, + table: Vec, memory_arena: MemoryArena, mask: usize, - occupied: Vec, len: usize, } -struct QuadraticProbing { - hash: usize, - i: usize, - mask: usize, +struct LinearProbing { + hash: HashType, + i: u32, + mask: u32, } -impl QuadraticProbing { +impl LinearProbing { #[inline] - fn compute(hash: usize, mask: usize) -> QuadraticProbing { - QuadraticProbing { hash, i: 0, mask } + fn compute(hash: HashType, mask: usize) -> LinearProbing { + LinearProbing { + hash, + i: 0, + mask: mask as u32, + } } #[inline] fn next_probe(&mut self) -> usize { self.i += 1; - (self.hash + self.i) & self.mask + ((self.hash + self.i) & self.mask) as usize } } +type IterNonEmpty<'a> = Filter>, fn(&KeyValue) -> bool>; + pub struct Iter<'a> { hashmap: &'a ArenaHashMap, - inner: slice::Iter<'a, usize>, + inner: IterNonEmpty<'a>, } impl<'a> Iterator for Iter<'a> { type Item = (&'a [u8], Addr, UnorderedId); fn next(&mut self) -> Option { - self.inner.next().cloned().map(move |bucket: usize| { - let kv = self.hashmap.table[bucket]; + self.inner.next().map(move |kv| { let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr); (key, offset, kv.unordered_id) }) @@ -107,10 +116,9 @@ impl Default for ArenaHashMap { fn default() -> Self { let memory_arena = MemoryArena::default(); ArenaHashMap { - table: Box::new([]), + table: Vec::new(), memory_arena, mask: 0, - occupied: Vec::new(), len: 0, } } @@ -120,26 +128,29 @@ impl ArenaHashMap { pub fn with_capacity(table_size: usize) -> ArenaHashMap { let table_size_power_of_2 = compute_previous_power_of_two(table_size); let memory_arena = MemoryArena::default(); - let table: Vec = iter::repeat(KeyValue::default()) - .take(table_size_power_of_2) - .collect(); + let table = vec![KeyValue::default(); table_size_power_of_2]; + ArenaHashMap { - table: table.into_boxed_slice(), + table, memory_arena, mask: table_size_power_of_2 - 1, - occupied: Vec::with_capacity(table_size_power_of_2 / 2), len: 0, } } + #[inline] + fn get_hash(&self, key: &[u8]) -> HashType { + murmurhash32::murmurhash2(key) + } + #[inline] pub fn read(&self, addr: Addr) -> Item { self.memory_arena.read(addr) } #[inline] - fn probe(&self, hash: u32) -> QuadraticProbing { - QuadraticProbing::compute(hash as usize, self.mask) + fn probe(&self, hash: HashType) -> LinearProbing { + LinearProbing::compute(hash, self.mask) } #[inline] @@ -149,15 +160,16 @@ impl ArenaHashMap { #[inline] fn is_saturated(&self) -> bool { - self.table.len() <= self.occupied.len() * 3 + self.table.len() <= self.len * 2 } #[inline] fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) { let data = self.memory_arena.slice_from(addr); - let key_bytes_len = NativeEndian::read_u16(data) as usize; - let key_bytes: &[u8] = &data[2..][..key_bytes_len]; - (key_bytes, addr.offset(2u32 + key_bytes_len as u32)) + let (key_bytes_len_bytes, data) = data.split_at(2); + let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap()); + let key_bytes: &[u8] = &data[..key_bytes_len as usize]; + (key_bytes, addr.offset(2 + key_bytes_len as u32)) } #[inline] @@ -171,10 +183,10 @@ impl ArenaHashMap { } #[inline] - fn set_bucket(&mut self, hash: u32, key_value_addr: Addr, bucket: usize) -> UnorderedId { - self.occupied.push(bucket); + fn set_bucket(&mut self, hash: HashType, key_value_addr: Addr, bucket: usize) -> UnorderedId { let unordered_id = self.len as UnorderedId; self.len += 1; + self.table[bucket] = KeyValue { key_value_addr, hash, @@ -196,7 +208,11 @@ impl ArenaHashMap { #[inline] pub fn iter(&self) -> Iter<'_> { Iter { - inner: self.occupied.iter(), + inner: self + .table + .iter() + .cloned() + .filter(KeyValue::is_not_empty_ref), hashmap: self, } } @@ -205,15 +221,13 @@ impl ArenaHashMap { let new_len = (self.table.len() * 2).max(1 << 13); let mask = new_len - 1; self.mask = mask; - let new_table = vec![KeyValue::default(); new_len].into_boxed_slice(); + let new_table = vec![KeyValue::default(); new_len]; let old_table = mem::replace(&mut self.table, new_table); - for old_pos in self.occupied.iter_mut() { - let key_value: KeyValue = old_table[*old_pos]; - let mut probe = QuadraticProbing::compute(key_value.hash as usize, mask); + for key_value in old_table.into_iter().filter(KeyValue::is_not_empty_ref) { + let mut probe = LinearProbing::compute(key_value.hash, mask); loop { let bucket = probe.next_probe(); if self.table[bucket].is_empty() { - *old_pos = bucket; self.table[bucket] = key_value; break; } @@ -222,9 +236,10 @@ impl ArenaHashMap { } /// Get a value associated to a key. + #[inline] pub fn get(&self, key: &[u8]) -> Option where V: Copy + 'static { - let hash = murmurhash2(key); + let hash = self.get_hash(key); let mut probe = self.probe(hash); loop { let bucket = probe.next_probe(); @@ -261,7 +276,7 @@ impl ArenaHashMap { if self.is_saturated() { self.resize(); } - let hash = murmurhash2(key); + let hash = self.get_hash(key); let mut probe = self.probe(hash); loop { let bucket = probe.next_probe(); @@ -273,11 +288,12 @@ impl ArenaHashMap { let key_addr = self.memory_arena.allocate_space(num_bytes); { let data = self.memory_arena.slice_mut(key_addr, num_bytes); - NativeEndian::write_u16(data, key.len() as u16); + data[..2].copy_from_slice(&(key.len() as u16).to_le_bytes()); let stop = 2 + key.len(); data[2..stop].copy_from_slice(key); store(&mut data[stop..], val); } + return self.set_bucket(hash, key_addr, bucket); } else if kv.hash == hash { if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) { diff --git a/stacker/src/lib.rs b/stacker/src/lib.rs index add06359a..be44f2627 100644 --- a/stacker/src/lib.rs +++ b/stacker/src/lib.rs @@ -2,9 +2,9 @@ mod arena_hashmap; mod expull; mod memory_arena; -pub use self::arena_hashmap::{compute_table_size, ArenaHashMap}; +pub use self::arena_hashmap::{compute_table_memory_size, ArenaHashMap}; pub use self::expull::ExpUnrolledLinkedList; pub use self::memory_arena::{Addr, MemoryArena}; /// When adding an element in a `ArenaHashMap`, we get a unique id associated to the given key. -pub type UnorderedId = u64; +pub type UnorderedId = u32; diff --git a/stacker/src/memory_arena.rs b/stacker/src/memory_arena.rs index 2155d4a0f..b9e9d3770 100644 --- a/stacker/src/memory_arena.rs +++ b/stacker/src/memory_arena.rs @@ -41,42 +41,50 @@ pub struct Addr(u32); impl Addr { /// Creates a null pointer. + #[inline] pub fn null_pointer() -> Addr { Addr(u32::MAX) } /// Returns the `Addr` object for `addr + offset` + #[inline] pub fn offset(self, offset: u32) -> Addr { Addr(self.0.wrapping_add(offset)) } + #[inline] fn new(page_id: usize, local_addr: usize) -> Addr { Addr((page_id << NUM_BITS_PAGE_ADDR | local_addr) as u32) } + #[inline] fn page_id(self) -> usize { (self.0 as usize) >> NUM_BITS_PAGE_ADDR } + #[inline] fn page_local_addr(self) -> usize { (self.0 as usize) & (PAGE_SIZE - 1) } /// Returns true if and only if the `Addr` is null. + #[inline] pub fn is_null(self) -> bool { self.0 == u32::MAX } } +#[inline] pub fn store(dest: &mut [u8], val: Item) { - assert_eq!(dest.len(), std::mem::size_of::()); + debug_assert_eq!(dest.len(), std::mem::size_of::()); unsafe { ptr::write_unaligned(dest.as_mut_ptr() as *mut Item, val); } } +#[inline] pub fn load(data: &[u8]) -> Item { - assert_eq!(data.len(), std::mem::size_of::()); + debug_assert_eq!(data.len(), std::mem::size_of::()); unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) } } @@ -111,6 +119,7 @@ impl MemoryArena { self.pages.len() * PAGE_SIZE } + #[inline] pub fn write_at(&mut self, addr: Addr, val: Item) { let dest = self.slice_mut(addr, std::mem::size_of::()); store(dest, val); @@ -121,14 +130,17 @@ impl MemoryArena { /// # Panics /// /// If the address is erroneous + #[inline] pub fn read(&self, addr: Addr) -> Item { load(self.slice(addr, mem::size_of::())) } + #[inline] pub fn slice(&self, addr: Addr, len: usize) -> &[u8] { self.pages[addr.page_id()].slice(addr.page_local_addr(), len) } + #[inline] pub fn slice_from(&self, addr: Addr) -> &[u8] { self.pages[addr.page_id()].slice_from(addr.page_local_addr()) } @@ -168,14 +180,17 @@ impl Page { len + self.len <= PAGE_SIZE } + #[inline] fn slice(&self, local_addr: usize, len: usize) -> &[u8] { &self.slice_from(local_addr)[..len] } + #[inline] fn slice_from(&self, local_addr: usize) -> &[u8] { &self.data[local_addr..] } + #[inline] fn slice_mut(&mut self, local_addr: usize, len: usize) -> &mut [u8] { &mut self.data[local_addr..][..len] }