mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-23 02:29:57 +00:00
Perf: use term hashmap in fastfield (#2243)
* add shared arena hashmap * bench fastfield indexing * use shared arena hashmap in columnar lower minimum resize in hashtable * clippy * add comments
This commit is contained in:
@@ -82,8 +82,8 @@ more-asserts = "0.3.1"
|
||||
rand_distr = "0.4.3"
|
||||
|
||||
[target.'cfg(not(windows))'.dev-dependencies]
|
||||
criterion = "0.5"
|
||||
pprof = { git = "https://github.com/PSeitz/pprof-rs/", rev = "53af24b", features = ["flamegraph", "criterion"] } # temp fork that works with criterion 0.5
|
||||
criterion = { version = "0.5" }
|
||||
pprof = { version= "0.13", features = ["flamegraph", "criterion"] }
|
||||
|
||||
[dev-dependencies.fail]
|
||||
version = "0.5.0"
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use tantivy::schema::{TantivyDocument, FAST, INDEXED, STORED, STRING, TEXT};
|
||||
use tantivy::{Index, IndexWriter};
|
||||
use tantivy::{tokenizer, Index, IndexWriter};
|
||||
|
||||
const HDFS_LOGS: &str = include_str!("hdfs.json");
|
||||
const GH_LOGS: &str = include_str!("gh.json");
|
||||
@@ -19,6 +19,13 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
|
||||
schema_builder.add_text_field("severity", STRING);
|
||||
schema_builder.build()
|
||||
};
|
||||
let schema_only_fast = {
|
||||
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
|
||||
schema_builder.add_u64_field("timestamp", FAST);
|
||||
schema_builder.add_text_field("body", FAST);
|
||||
schema_builder.add_text_field("severity", FAST);
|
||||
schema_builder.build()
|
||||
};
|
||||
let schema_with_store = {
|
||||
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
|
||||
schema_builder.add_u64_field("timestamp", INDEXED | STORED);
|
||||
@@ -83,6 +90,30 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
|
||||
index_writer.commit().unwrap();
|
||||
})
|
||||
});
|
||||
group.bench_function("index-hdfs-no-commit-fastfield", |b| {
|
||||
let lines = get_lines(HDFS_LOGS);
|
||||
b.iter(|| {
|
||||
let index = Index::create_in_ram(schema_only_fast.clone());
|
||||
let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap();
|
||||
for doc_json in &lines {
|
||||
let doc = TantivyDocument::parse_json(&schema, doc_json).unwrap();
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
})
|
||||
});
|
||||
group.bench_function("index-hdfs-with-commit-fastfield", |b| {
|
||||
let lines = get_lines(HDFS_LOGS);
|
||||
b.iter(|| {
|
||||
let index = Index::create_in_ram(schema_only_fast.clone());
|
||||
let mut index_writer: IndexWriter =
|
||||
index.writer_with_num_threads(1, 100_000_000).unwrap();
|
||||
for doc_json in &lines {
|
||||
let doc = TantivyDocument::parse_json(&schema, doc_json).unwrap();
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
index_writer.commit().unwrap();
|
||||
})
|
||||
});
|
||||
group.bench_function("index-hdfs-no-commit-json-without-docstore", |b| {
|
||||
let lines = get_lines(HDFS_LOGS);
|
||||
b.iter(|| {
|
||||
@@ -107,6 +138,18 @@ pub fn gh_index_benchmark(c: &mut Criterion) {
|
||||
schema_builder.add_json_field("json", TEXT | FAST);
|
||||
schema_builder.build()
|
||||
};
|
||||
let dynamic_schema_fast = {
|
||||
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
|
||||
schema_builder.add_json_field("json", FAST);
|
||||
schema_builder.build()
|
||||
};
|
||||
let ff_tokenizer_manager = tokenizer::TokenizerManager::default();
|
||||
ff_tokenizer_manager.register(
|
||||
"raw",
|
||||
tokenizer::TextAnalyzer::builder(tokenizer::RawTokenizer::default())
|
||||
.filter(tokenizer::RemoveLongFilter::limit(255))
|
||||
.build(),
|
||||
);
|
||||
|
||||
let mut group = c.benchmark_group("index-gh");
|
||||
group.throughput(Throughput::Bytes(GH_LOGS.len() as u64));
|
||||
@@ -115,7 +158,8 @@ pub fn gh_index_benchmark(c: &mut Criterion) {
|
||||
let lines = get_lines(GH_LOGS);
|
||||
b.iter(|| {
|
||||
let json_field = dynamic_schema.get_field("json").unwrap();
|
||||
let index = Index::create_in_ram(dynamic_schema.clone());
|
||||
let mut index = Index::create_in_ram(dynamic_schema.clone());
|
||||
index.set_fast_field_tokenizers(ff_tokenizer_manager.clone());
|
||||
let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap();
|
||||
for doc_json in &lines {
|
||||
let json_val: serde_json::Map<String, serde_json::Value> =
|
||||
@@ -125,11 +169,28 @@ pub fn gh_index_benchmark(c: &mut Criterion) {
|
||||
}
|
||||
})
|
||||
});
|
||||
group.bench_function("index-gh-fast", |b| {
|
||||
let lines = get_lines(GH_LOGS);
|
||||
b.iter(|| {
|
||||
let json_field = dynamic_schema_fast.get_field("json").unwrap();
|
||||
let mut index = Index::create_in_ram(dynamic_schema_fast.clone());
|
||||
index.set_fast_field_tokenizers(ff_tokenizer_manager.clone());
|
||||
let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap();
|
||||
for doc_json in &lines {
|
||||
let json_val: serde_json::Map<String, serde_json::Value> =
|
||||
serde_json::from_str(doc_json).unwrap();
|
||||
let doc = tantivy::doc!(json_field=>json_val);
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
group.bench_function("index-gh-with-commit", |b| {
|
||||
let lines = get_lines(GH_LOGS);
|
||||
b.iter(|| {
|
||||
let json_field = dynamic_schema.get_field("json").unwrap();
|
||||
let index = Index::create_in_ram(dynamic_schema.clone());
|
||||
let mut index = Index::create_in_ram(dynamic_schema.clone());
|
||||
index.set_fast_field_tokenizers(ff_tokenizer_manager.clone());
|
||||
let mut index_writer: IndexWriter =
|
||||
index.writer_with_num_threads(1, 100_000_000).unwrap();
|
||||
for doc_json in &lines {
|
||||
|
||||
@@ -269,7 +269,8 @@ impl StrOrBytesColumnWriter {
|
||||
dictionaries: &mut [DictionaryBuilder],
|
||||
arena: &mut MemoryArena,
|
||||
) {
|
||||
let unordered_id = dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes);
|
||||
let unordered_id =
|
||||
dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes, arena);
|
||||
self.column_writer.record(doc, unordered_id, arena);
|
||||
}
|
||||
|
||||
|
||||
@@ -437,6 +437,7 @@ impl ColumnarWriter {
|
||||
&mut symbol_byte_buffer,
|
||||
),
|
||||
buffers,
|
||||
&self.arena,
|
||||
&mut column_serializer,
|
||||
)?;
|
||||
column_serializer.finalize()?;
|
||||
@@ -490,6 +491,7 @@ impl ColumnarWriter {
|
||||
|
||||
// Serialize [Dictionary, Column, dictionary num bytes U32::LE]
|
||||
// Column: [Column Index, Column Values, column index num bytes U32::LE]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn serialize_bytes_or_str_column(
|
||||
cardinality: Cardinality,
|
||||
num_docs: RowId,
|
||||
@@ -497,6 +499,7 @@ fn serialize_bytes_or_str_column(
|
||||
dictionary_builder: &DictionaryBuilder,
|
||||
operation_it: impl Iterator<Item = ColumnOperation<UnorderedId>>,
|
||||
buffers: &mut SpareBuffers,
|
||||
arena: &MemoryArena,
|
||||
wrt: impl io::Write,
|
||||
) -> io::Result<()> {
|
||||
let SpareBuffers {
|
||||
@@ -505,7 +508,8 @@ fn serialize_bytes_or_str_column(
|
||||
..
|
||||
} = buffers;
|
||||
let mut counting_writer = CountingWriter::wrap(wrt);
|
||||
let term_id_mapping: TermIdMapping = dictionary_builder.serialize(&mut counting_writer)?;
|
||||
let term_id_mapping: TermIdMapping =
|
||||
dictionary_builder.serialize(arena, &mut counting_writer)?;
|
||||
let dictionary_num_bytes: u32 = counting_writer.written_bytes() as u32;
|
||||
let mut wrt = counting_writer.finish();
|
||||
let operation_iterator = operation_it.map(|symbol: ColumnOperation<UnorderedId>| {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::io;
|
||||
|
||||
use fnv::FnvHashMap;
|
||||
use sstable::SSTable;
|
||||
use stacker::{MemoryArena, SharedArenaHashMap};
|
||||
|
||||
pub(crate) struct TermIdMapping {
|
||||
unordered_to_ord: Vec<OrderedId>,
|
||||
@@ -31,29 +31,38 @@ pub struct OrderedId(pub u32);
|
||||
/// mapping.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct DictionaryBuilder {
|
||||
dict: FnvHashMap<Vec<u8>, UnorderedId>,
|
||||
memory_consumption: usize,
|
||||
dict: SharedArenaHashMap,
|
||||
}
|
||||
|
||||
impl DictionaryBuilder {
|
||||
/// Get or allocate an unordered id.
|
||||
/// (This ID is simply an auto-incremented id.)
|
||||
pub fn get_or_allocate_id(&mut self, term: &[u8]) -> UnorderedId {
|
||||
if let Some(term_id) = self.dict.get(term) {
|
||||
return *term_id;
|
||||
}
|
||||
let new_id = UnorderedId(self.dict.len() as u32);
|
||||
self.dict.insert(term.to_vec(), new_id);
|
||||
self.memory_consumption += term.len();
|
||||
self.memory_consumption += 40; // Term Metadata + HashMap overhead
|
||||
new_id
|
||||
pub fn get_or_allocate_id(&mut self, term: &[u8], arena: &mut MemoryArena) -> UnorderedId {
|
||||
let next_id = self.dict.len() as u32;
|
||||
let unordered_id = self
|
||||
.dict
|
||||
.mutate_or_create(term, arena, |unordered_id: Option<u32>| {
|
||||
if let Some(unordered_id) = unordered_id {
|
||||
unordered_id
|
||||
} else {
|
||||
next_id
|
||||
}
|
||||
});
|
||||
UnorderedId(unordered_id)
|
||||
}
|
||||
|
||||
/// Serialize the dictionary into an fst, and returns the
|
||||
/// `UnorderedId -> TermOrdinal` map.
|
||||
pub fn serialize<'a, W: io::Write + 'a>(&self, wrt: &mut W) -> io::Result<TermIdMapping> {
|
||||
let mut terms: Vec<(&[u8], UnorderedId)> =
|
||||
self.dict.iter().map(|(k, v)| (k.as_slice(), *v)).collect();
|
||||
pub fn serialize<'a, W: io::Write + 'a>(
|
||||
&self,
|
||||
arena: &MemoryArena,
|
||||
wrt: &mut W,
|
||||
) -> io::Result<TermIdMapping> {
|
||||
let mut terms: Vec<(&[u8], UnorderedId)> = self
|
||||
.dict
|
||||
.iter(arena)
|
||||
.map(|(k, v)| (k, arena.read(v)))
|
||||
.collect();
|
||||
terms.sort_unstable_by_key(|(key, _)| *key);
|
||||
// TODO Remove the allocation.
|
||||
let mut unordered_to_ord: Vec<OrderedId> = vec![OrderedId(0u32); terms.len()];
|
||||
@@ -68,7 +77,7 @@ impl DictionaryBuilder {
|
||||
}
|
||||
|
||||
pub(crate) fn mem_usage(&self) -> usize {
|
||||
self.memory_consumption
|
||||
self.dict.mem_usage()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,12 +87,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_dictionary_builder() {
|
||||
let mut arena = MemoryArena::default();
|
||||
let mut dictionary_builder = DictionaryBuilder::default();
|
||||
let hello_uid = dictionary_builder.get_or_allocate_id(b"hello");
|
||||
let happy_uid = dictionary_builder.get_or_allocate_id(b"happy");
|
||||
let tax_uid = dictionary_builder.get_or_allocate_id(b"tax");
|
||||
let hello_uid = dictionary_builder.get_or_allocate_id(b"hello", &mut arena);
|
||||
let happy_uid = dictionary_builder.get_or_allocate_id(b"happy", &mut arena);
|
||||
let tax_uid = dictionary_builder.get_or_allocate_id(b"tax", &mut arena);
|
||||
let mut buffer = Vec::new();
|
||||
let id_mapping = dictionary_builder.serialize(&mut buffer).unwrap();
|
||||
let id_mapping = dictionary_builder.serialize(&arena, &mut buffer).unwrap();
|
||||
assert_eq!(id_mapping.to_ord(hello_uid), OrderedId(1));
|
||||
assert_eq!(id_mapping.to_ord(happy_uid), OrderedId(0));
|
||||
assert_eq!(id_mapping.to_ord(tax_uid), OrderedId(2));
|
||||
|
||||
@@ -1,51 +1,5 @@
|
||||
use std::iter::{Cloned, Filter};
|
||||
use std::mem;
|
||||
|
||||
use super::{Addr, MemoryArena};
|
||||
use crate::fastcpy::fast_short_slice_copy;
|
||||
use crate::memory_arena::store;
|
||||
|
||||
/// Returns the actual memory size in bytes
|
||||
/// required to create a table with a given capacity.
|
||||
/// required to create a table of size
|
||||
pub fn compute_table_memory_size(capacity: usize) -> usize {
|
||||
capacity * mem::size_of::<KeyValue>()
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "compare_hash_only"))]
|
||||
type HashType = u32;
|
||||
|
||||
#[cfg(feature = "compare_hash_only")]
|
||||
type HashType = u64;
|
||||
|
||||
/// `KeyValue` is the item stored in the hash table.
|
||||
/// The key is actually a `BytesRef` object stored in an external memory arena.
|
||||
/// The `value_addr` also points to an address in the memory arena.
|
||||
#[derive(Copy, Clone)]
|
||||
struct KeyValue {
|
||||
pub(crate) key_value_addr: Addr,
|
||||
hash: HashType,
|
||||
}
|
||||
|
||||
impl Default for KeyValue {
|
||||
fn default() -> Self {
|
||||
KeyValue {
|
||||
key_value_addr: Addr::null_pointer(),
|
||||
hash: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyValue {
|
||||
#[inline]
|
||||
fn is_empty(&self) -> bool {
|
||||
self.key_value_addr.is_null()
|
||||
}
|
||||
#[inline]
|
||||
fn is_not_empty_ref(&self) -> bool {
|
||||
!self.key_value_addr.is_null()
|
||||
}
|
||||
}
|
||||
use crate::shared_arena_hashmap::SharedArenaHashMap;
|
||||
|
||||
/// Customized `HashMap` with `&[u8]` keys
|
||||
///
|
||||
@@ -56,61 +10,13 @@ impl KeyValue {
|
||||
/// The quirky API has the benefit of avoiding
|
||||
/// the computation of the hash of the key twice,
|
||||
/// or copying the key as long as there is no insert.
|
||||
pub struct ArenaHashMap {
|
||||
table: Vec<KeyValue>,
|
||||
pub memory_arena: MemoryArena,
|
||||
mask: usize,
|
||||
len: usize,
|
||||
}
|
||||
|
||||
struct LinearProbing {
|
||||
pos: usize,
|
||||
mask: usize,
|
||||
}
|
||||
|
||||
impl LinearProbing {
|
||||
#[inline]
|
||||
fn compute(hash: HashType, mask: usize) -> LinearProbing {
|
||||
LinearProbing {
|
||||
pos: hash as usize,
|
||||
mask,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn next_probe(&mut self) -> usize {
|
||||
// Not saving the masked version removes a dependency.
|
||||
self.pos = self.pos.wrapping_add(1);
|
||||
self.pos & self.mask
|
||||
}
|
||||
}
|
||||
|
||||
type IterNonEmpty<'a> = Filter<Cloned<std::slice::Iter<'a, KeyValue>>, fn(&KeyValue) -> bool>;
|
||||
|
||||
pub struct Iter<'a> {
|
||||
hashmap: &'a ArenaHashMap,
|
||||
inner: IterNonEmpty<'a>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for Iter<'a> {
|
||||
type Item = (&'a [u8], Addr);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.inner.next().map(move |kv| {
|
||||
let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr);
|
||||
(key, offset)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the greatest power of two lower or equal to `n`.
|
||||
/// Except if n == 0, in that case, return 1.
|
||||
///
|
||||
/// # Panics if n == 0
|
||||
fn compute_previous_power_of_two(n: usize) -> usize {
|
||||
assert!(n > 0);
|
||||
let msb = (63u32 - (n as u64).leading_zeros()) as u8;
|
||||
1 << msb
|
||||
/// ArenaHashMap is like SharedArenaHashMap but takes ownership
|
||||
/// of the memory arena. The memory arena stores the serialized
|
||||
/// keys and values.
|
||||
pub struct ArenaHashMap {
|
||||
shared_arena_hashmap: SharedArenaHashMap,
|
||||
pub memory_arena: MemoryArena,
|
||||
}
|
||||
|
||||
impl Default for ArenaHashMap {
|
||||
@@ -121,156 +27,44 @@ impl Default for ArenaHashMap {
|
||||
|
||||
impl ArenaHashMap {
|
||||
pub fn with_capacity(table_size: usize) -> ArenaHashMap {
|
||||
let table_size_power_of_2 = compute_previous_power_of_two(table_size);
|
||||
let memory_arena = MemoryArena::default();
|
||||
let table = vec![KeyValue::default(); table_size_power_of_2];
|
||||
|
||||
ArenaHashMap {
|
||||
table,
|
||||
shared_arena_hashmap: SharedArenaHashMap::with_capacity(table_size),
|
||||
memory_arena,
|
||||
mask: table_size_power_of_2 - 1,
|
||||
len: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[cfg(not(feature = "compare_hash_only"))]
|
||||
fn get_hash(&self, key: &[u8]) -> HashType {
|
||||
murmurhash32::murmurhash2(key)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[cfg(feature = "compare_hash_only")]
|
||||
fn get_hash(&self, key: &[u8]) -> HashType {
|
||||
/// Since we compare only the hash we need a high quality hash.
|
||||
use std::hash::Hasher;
|
||||
let mut hasher = ahash::AHasher::default();
|
||||
hasher.write(key);
|
||||
hasher.finish() as HashType
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
|
||||
self.memory_arena.read(addr)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn probe(&self, hash: HashType) -> LinearProbing {
|
||||
LinearProbing::compute(hash, self.mask)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn mem_usage(&self) -> usize {
|
||||
self.table.len() * mem::size_of::<KeyValue>() + self.memory_arena.mem_usage()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_saturated(&self) -> bool {
|
||||
self.table.len() <= self.len * 2
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) {
|
||||
let data = self.memory_arena.slice_from(addr);
|
||||
let key_bytes_len_bytes = unsafe { data.get_unchecked(..2) };
|
||||
let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap());
|
||||
let key_bytes: &[u8] = unsafe { data.get_unchecked(2..2 + key_bytes_len as usize) };
|
||||
(key_bytes, addr.offset(2 + key_bytes_len as u32))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[cfg(not(feature = "compare_hash_only"))]
|
||||
fn get_value_addr_if_key_match(&self, target_key: &[u8], addr: Addr) -> Option<Addr> {
|
||||
use crate::fastcmp::fast_short_slice_compare;
|
||||
|
||||
let (stored_key, value_addr) = self.get_key_value(addr);
|
||||
if fast_short_slice_compare(stored_key, target_key) {
|
||||
Some(value_addr)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
#[cfg(feature = "compare_hash_only")]
|
||||
fn get_value_addr_if_key_match(&self, _target_key: &[u8], addr: Addr) -> Option<Addr> {
|
||||
// For the compare_hash_only feature, it would make sense to store the keys at a different
|
||||
// memory location. Here they will just pollute the cache.
|
||||
let data = self.memory_arena.slice_from(addr);
|
||||
let key_bytes_len_bytes = &data[..2];
|
||||
let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap());
|
||||
let value_addr = addr.offset(2 + key_bytes_len as u32);
|
||||
|
||||
Some(value_addr)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn set_bucket(&mut self, hash: HashType, key_value_addr: Addr, bucket: usize) {
|
||||
self.len += 1;
|
||||
|
||||
self.table[bucket] = KeyValue {
|
||||
key_value_addr,
|
||||
hash,
|
||||
};
|
||||
self.shared_arena_hashmap.mem_usage() + self.memory_arena.mem_usage()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
self.shared_arena_hashmap.is_empty()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.len
|
||||
self.shared_arena_hashmap.len()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn iter(&self) -> Iter<'_> {
|
||||
Iter {
|
||||
inner: self
|
||||
.table
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(KeyValue::is_not_empty_ref),
|
||||
hashmap: self,
|
||||
}
|
||||
}
|
||||
|
||||
fn resize(&mut self) {
|
||||
let new_len = (self.table.len() * 2).max(1 << 13);
|
||||
let mask = new_len - 1;
|
||||
self.mask = mask;
|
||||
let new_table = vec![KeyValue::default(); new_len];
|
||||
let old_table = mem::replace(&mut self.table, new_table);
|
||||
for key_value in old_table.into_iter().filter(KeyValue::is_not_empty_ref) {
|
||||
let mut probe = LinearProbing::compute(key_value.hash, mask);
|
||||
loop {
|
||||
let bucket = probe.next_probe();
|
||||
if self.table[bucket].is_empty() {
|
||||
self.table[bucket] = key_value;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&[u8], Addr)> {
|
||||
self.shared_arena_hashmap.iter(&self.memory_arena)
|
||||
}
|
||||
|
||||
/// Get a value associated to a key.
|
||||
#[inline]
|
||||
pub fn get<V>(&self, key: &[u8]) -> Option<V>
|
||||
where V: Copy + 'static {
|
||||
let hash = self.get_hash(key);
|
||||
let mut probe = self.probe(hash);
|
||||
loop {
|
||||
let bucket = probe.next_probe();
|
||||
let kv: KeyValue = self.table[bucket];
|
||||
if kv.is_empty() {
|
||||
return None;
|
||||
} else if kv.hash == hash {
|
||||
if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) {
|
||||
let v = self.memory_arena.read(val_addr);
|
||||
return Some(v);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.shared_arena_hashmap.get(key, &self.memory_arena)
|
||||
}
|
||||
|
||||
/// `update` create a new entry for a given key if it does not exist
|
||||
@@ -284,45 +78,10 @@ impl ArenaHashMap {
|
||||
/// If the key already as an associated value, then it will be passed
|
||||
/// `Some(previous_value)`.
|
||||
#[inline]
|
||||
pub fn mutate_or_create<V>(&mut self, key: &[u8], mut updater: impl FnMut(Option<V>) -> V)
|
||||
pub fn mutate_or_create<V>(&mut self, key: &[u8], updater: impl FnMut(Option<V>) -> V)
|
||||
where V: Copy + 'static {
|
||||
if self.is_saturated() {
|
||||
self.resize();
|
||||
}
|
||||
let hash = self.get_hash(key);
|
||||
let mut probe = self.probe(hash);
|
||||
let mut bucket = probe.next_probe();
|
||||
let mut kv: KeyValue = self.table[bucket];
|
||||
loop {
|
||||
if kv.is_empty() {
|
||||
// The key does not exist yet.
|
||||
let val = updater(None);
|
||||
let num_bytes = std::mem::size_of::<u16>() + key.len() + std::mem::size_of::<V>();
|
||||
let key_addr = self.memory_arena.allocate_space(num_bytes);
|
||||
{
|
||||
let data = self.memory_arena.slice_mut(key_addr, num_bytes);
|
||||
let key_len_bytes: [u8; 2] = (key.len() as u16).to_le_bytes();
|
||||
data[..2].copy_from_slice(&key_len_bytes);
|
||||
let stop = 2 + key.len();
|
||||
fast_short_slice_copy(key, &mut data[2..stop]);
|
||||
store(&mut data[stop..], val);
|
||||
}
|
||||
|
||||
self.set_bucket(hash, key_addr, bucket);
|
||||
return;
|
||||
}
|
||||
if kv.hash == hash {
|
||||
if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) {
|
||||
let v = self.memory_arena.read(val_addr);
|
||||
let new_v = updater(Some(v));
|
||||
self.memory_arena.write_at(val_addr, new_v);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// This allows fetching the next bucket before the loop jmp
|
||||
bucket = probe.next_probe();
|
||||
kv = self.table[bucket];
|
||||
}
|
||||
self.shared_arena_hashmap
|
||||
.mutate_or_create(key, &mut self.memory_arena, updater);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,7 +90,7 @@ mod tests {
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use super::{compute_previous_power_of_two, ArenaHashMap};
|
||||
use super::ArenaHashMap;
|
||||
|
||||
#[test]
|
||||
fn test_hash_map() {
|
||||
@@ -362,14 +121,6 @@ mod tests {
|
||||
assert_eq!(hash_map.get::<u32>(b"abc"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compute_previous_power_of_two() {
|
||||
assert_eq!(compute_previous_power_of_two(8), 8);
|
||||
assert_eq!(compute_previous_power_of_two(9), 8);
|
||||
assert_eq!(compute_previous_power_of_two(7), 4);
|
||||
assert_eq!(compute_previous_power_of_two(u64::MAX as usize), 1 << 63);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_many_terms() {
|
||||
let mut terms: Vec<String> = (0..20_000).map(|val| val.to_string()).collect();
|
||||
|
||||
@@ -9,10 +9,12 @@ mod expull;
|
||||
mod fastcmp;
|
||||
mod fastcpy;
|
||||
mod memory_arena;
|
||||
mod shared_arena_hashmap;
|
||||
|
||||
pub use self::arena_hashmap::{compute_table_memory_size, ArenaHashMap};
|
||||
pub use self::arena_hashmap::ArenaHashMap;
|
||||
pub use self::expull::ExpUnrolledLinkedList;
|
||||
pub use self::memory_arena::{Addr, MemoryArena};
|
||||
pub use self::shared_arena_hashmap::{compute_table_memory_size, SharedArenaHashMap};
|
||||
|
||||
/// When adding an element in a `ArenaHashMap`, we get a unique id associated to the given key.
|
||||
pub type UnorderedId = u32;
|
||||
|
||||
420
stacker/src/shared_arena_hashmap.rs
Normal file
420
stacker/src/shared_arena_hashmap.rs
Normal file
@@ -0,0 +1,420 @@
|
||||
use std::iter::{Cloned, Filter};
|
||||
use std::mem;
|
||||
|
||||
use super::{Addr, MemoryArena};
|
||||
use crate::fastcpy::fast_short_slice_copy;
|
||||
use crate::memory_arena::store;
|
||||
|
||||
/// Returns the actual memory size in bytes
|
||||
/// required to create a table with a given capacity.
|
||||
/// required to create a table of size
|
||||
pub fn compute_table_memory_size(capacity: usize) -> usize {
|
||||
capacity * mem::size_of::<KeyValue>()
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "compare_hash_only"))]
|
||||
type HashType = u32;
|
||||
|
||||
#[cfg(feature = "compare_hash_only")]
|
||||
type HashType = u64;
|
||||
|
||||
/// `KeyValue` is the item stored in the hash table.
|
||||
/// The key is actually a `BytesRef` object stored in an external memory arena.
|
||||
/// The `value_addr` also points to an address in the memory arena.
|
||||
#[derive(Copy, Clone)]
|
||||
struct KeyValue {
|
||||
key_value_addr: Addr,
|
||||
hash: HashType,
|
||||
}
|
||||
|
||||
impl Default for KeyValue {
|
||||
fn default() -> Self {
|
||||
KeyValue {
|
||||
key_value_addr: Addr::null_pointer(),
|
||||
hash: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyValue {
|
||||
#[inline]
|
||||
fn is_empty(&self) -> bool {
|
||||
self.key_value_addr.is_null()
|
||||
}
|
||||
#[inline]
|
||||
fn is_not_empty_ref(&self) -> bool {
|
||||
!self.key_value_addr.is_null()
|
||||
}
|
||||
}
|
||||
|
||||
/// Customized `HashMap` with `&[u8]` keys
|
||||
///
|
||||
/// Its main particularity is that rather than storing its
|
||||
/// keys in the heap, keys are stored in a memory arena
|
||||
/// inline with the values.
|
||||
///
|
||||
/// The quirky API has the benefit of avoiding
|
||||
/// the computation of the hash of the key twice,
|
||||
/// or copying the key as long as there is no insert.
|
||||
///
|
||||
/// SharedArenaHashMap is like ArenaHashMap but gets the memory arena
|
||||
/// passed as an argument to the methods.
|
||||
/// So one MemoryArena can be shared with multiple SharedArenaHashMap.
|
||||
pub struct SharedArenaHashMap {
|
||||
table: Vec<KeyValue>,
|
||||
mask: usize,
|
||||
len: usize,
|
||||
}
|
||||
|
||||
struct LinearProbing {
|
||||
pos: usize,
|
||||
mask: usize,
|
||||
}
|
||||
|
||||
impl LinearProbing {
|
||||
#[inline]
|
||||
fn compute(hash: HashType, mask: usize) -> LinearProbing {
|
||||
LinearProbing {
|
||||
pos: hash as usize,
|
||||
mask,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn next_probe(&mut self) -> usize {
|
||||
// Not saving the masked version removes a dependency.
|
||||
self.pos = self.pos.wrapping_add(1);
|
||||
self.pos & self.mask
|
||||
}
|
||||
}
|
||||
|
||||
type IterNonEmpty<'a> = Filter<Cloned<std::slice::Iter<'a, KeyValue>>, fn(&KeyValue) -> bool>;
|
||||
|
||||
pub struct Iter<'a> {
|
||||
hashmap: &'a SharedArenaHashMap,
|
||||
memory_arena: &'a MemoryArena,
|
||||
inner: IterNonEmpty<'a>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for Iter<'a> {
|
||||
type Item = (&'a [u8], Addr);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.inner.next().map(move |kv| {
|
||||
let (key, offset): (&'a [u8], Addr) = self
|
||||
.hashmap
|
||||
.get_key_value(kv.key_value_addr, self.memory_arena);
|
||||
(key, offset)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the greatest power of two lower or equal to `n`.
|
||||
/// Except if n == 0, in that case, return 1.
|
||||
///
|
||||
/// # Panics if n == 0
|
||||
fn compute_previous_power_of_two(n: usize) -> usize {
|
||||
assert!(n > 0);
|
||||
let msb = (63u32 - (n as u64).leading_zeros()) as u8;
|
||||
1 << msb
|
||||
}
|
||||
|
||||
impl Default for SharedArenaHashMap {
|
||||
fn default() -> Self {
|
||||
SharedArenaHashMap::with_capacity(4)
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedArenaHashMap {
|
||||
pub fn with_capacity(table_size: usize) -> SharedArenaHashMap {
|
||||
let table_size_power_of_2 = compute_previous_power_of_two(table_size);
|
||||
let table = vec![KeyValue::default(); table_size_power_of_2];
|
||||
|
||||
SharedArenaHashMap {
|
||||
table,
|
||||
mask: table_size_power_of_2 - 1,
|
||||
len: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[cfg(not(feature = "compare_hash_only"))]
|
||||
fn get_hash(&self, key: &[u8]) -> HashType {
|
||||
murmurhash32::murmurhash2(key)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[cfg(feature = "compare_hash_only")]
|
||||
fn get_hash(&self, key: &[u8]) -> HashType {
|
||||
/// Since we compare only the hash we need a high quality hash.
|
||||
use std::hash::Hasher;
|
||||
let mut hasher = ahash::AHasher::default();
|
||||
hasher.write(key);
|
||||
hasher.finish() as HashType
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn probe(&self, hash: HashType) -> LinearProbing {
|
||||
LinearProbing::compute(hash, self.mask)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn mem_usage(&self) -> usize {
|
||||
self.table.len() * mem::size_of::<KeyValue>()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_saturated(&self) -> bool {
|
||||
self.table.len() <= self.len * 2
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_key_value<'a>(&'a self, addr: Addr, memory_arena: &'a MemoryArena) -> (&[u8], Addr) {
|
||||
let data = memory_arena.slice_from(addr);
|
||||
let key_bytes_len_bytes = unsafe { data.get_unchecked(..2) };
|
||||
let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap());
|
||||
let key_bytes: &[u8] = unsafe { data.get_unchecked(2..2 + key_bytes_len as usize) };
|
||||
(key_bytes, addr.offset(2 + key_bytes_len as u32))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[cfg(not(feature = "compare_hash_only"))]
|
||||
fn get_value_addr_if_key_match(
|
||||
&self,
|
||||
target_key: &[u8],
|
||||
addr: Addr,
|
||||
memory_arena: &MemoryArena,
|
||||
) -> Option<Addr> {
|
||||
use crate::fastcmp::fast_short_slice_compare;
|
||||
|
||||
let (stored_key, value_addr) = self.get_key_value(addr, memory_arena);
|
||||
if fast_short_slice_compare(stored_key, target_key) {
|
||||
Some(value_addr)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
#[cfg(feature = "compare_hash_only")]
|
||||
fn get_value_addr_if_key_match(
|
||||
&self,
|
||||
_target_key: &[u8],
|
||||
addr: Addr,
|
||||
memory_arena: &MemoryArena,
|
||||
) -> Option<Addr> {
|
||||
// For the compare_hash_only feature, it would make sense to store the keys at a different
|
||||
// memory location. Here they will just pollute the cache.
|
||||
let data = memory_arena.slice_from(addr);
|
||||
let key_bytes_len_bytes = &data[..2];
|
||||
let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap());
|
||||
let value_addr = addr.offset(2 + key_bytes_len as u32);
|
||||
|
||||
Some(value_addr)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn set_bucket(&mut self, hash: HashType, key_value_addr: Addr, bucket: usize) {
|
||||
self.len += 1;
|
||||
|
||||
self.table[bucket] = KeyValue {
|
||||
key_value_addr,
|
||||
hash,
|
||||
};
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.len
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn iter<'a>(&'a self, memory_arena: &'a MemoryArena) -> Iter<'_> {
|
||||
Iter {
|
||||
inner: self
|
||||
.table
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(KeyValue::is_not_empty_ref),
|
||||
hashmap: self,
|
||||
memory_arena,
|
||||
}
|
||||
}
|
||||
|
||||
fn resize(&mut self) {
|
||||
let new_len = (self.table.len() * 2).max(1 << 3);
|
||||
let mask = new_len - 1;
|
||||
self.mask = mask;
|
||||
let new_table = vec![KeyValue::default(); new_len];
|
||||
let old_table = mem::replace(&mut self.table, new_table);
|
||||
for key_value in old_table.into_iter().filter(KeyValue::is_not_empty_ref) {
|
||||
let mut probe = LinearProbing::compute(key_value.hash, mask);
|
||||
loop {
|
||||
let bucket = probe.next_probe();
|
||||
if self.table[bucket].is_empty() {
|
||||
self.table[bucket] = key_value;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a value associated to a key.
|
||||
#[inline]
|
||||
pub fn get<V>(&self, key: &[u8], memory_arena: &MemoryArena) -> Option<V>
|
||||
where V: Copy + 'static {
|
||||
let hash = self.get_hash(key);
|
||||
let mut probe = self.probe(hash);
|
||||
loop {
|
||||
let bucket = probe.next_probe();
|
||||
let kv: KeyValue = self.table[bucket];
|
||||
if kv.is_empty() {
|
||||
return None;
|
||||
} else if kv.hash == hash {
|
||||
if let Some(val_addr) =
|
||||
self.get_value_addr_if_key_match(key, kv.key_value_addr, memory_arena)
|
||||
{
|
||||
let v = memory_arena.read(val_addr);
|
||||
return Some(v);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `update` create a new entry for a given key if it does not exist
|
||||
/// or updates the existing entry.
|
||||
///
|
||||
/// The actual logic for this update is define in the `updater`
|
||||
/// argument.
|
||||
///
|
||||
/// If the key is not present, `updater` will receive `None` and
|
||||
/// will be in charge of returning a default value.
|
||||
/// If the key already as an associated value, then it will be passed
|
||||
/// `Some(previous_value)`.
|
||||
#[inline]
|
||||
pub fn mutate_or_create<V>(
|
||||
&mut self,
|
||||
key: &[u8],
|
||||
memory_arena: &mut MemoryArena,
|
||||
mut updater: impl FnMut(Option<V>) -> V,
|
||||
) -> V
|
||||
where
|
||||
V: Copy + 'static,
|
||||
{
|
||||
if self.is_saturated() {
|
||||
self.resize();
|
||||
}
|
||||
let hash = self.get_hash(key);
|
||||
let mut probe = self.probe(hash);
|
||||
let mut bucket = probe.next_probe();
|
||||
let mut kv: KeyValue = self.table[bucket];
|
||||
loop {
|
||||
if kv.is_empty() {
|
||||
// The key does not exist yet.
|
||||
let val = updater(None);
|
||||
let num_bytes = std::mem::size_of::<u16>() + key.len() + std::mem::size_of::<V>();
|
||||
let key_addr = memory_arena.allocate_space(num_bytes);
|
||||
{
|
||||
let data = memory_arena.slice_mut(key_addr, num_bytes);
|
||||
let key_len_bytes: [u8; 2] = (key.len() as u16).to_le_bytes();
|
||||
data[..2].copy_from_slice(&key_len_bytes);
|
||||
let stop = 2 + key.len();
|
||||
fast_short_slice_copy(key, &mut data[2..stop]);
|
||||
store(&mut data[stop..], val);
|
||||
}
|
||||
|
||||
self.set_bucket(hash, key_addr, bucket);
|
||||
return val;
|
||||
}
|
||||
if kv.hash == hash {
|
||||
if let Some(val_addr) =
|
||||
self.get_value_addr_if_key_match(key, kv.key_value_addr, memory_arena)
|
||||
{
|
||||
let v = memory_arena.read(val_addr);
|
||||
let new_v = updater(Some(v));
|
||||
memory_arena.write_at(val_addr, new_v);
|
||||
return new_v;
|
||||
}
|
||||
}
|
||||
// This allows fetching the next bucket before the loop jmp
|
||||
bucket = probe.next_probe();
|
||||
kv = self.table[bucket];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use super::{compute_previous_power_of_two, SharedArenaHashMap};
|
||||
use crate::MemoryArena;
|
||||
|
||||
#[test]
|
||||
fn test_hash_map() {
|
||||
let mut memory_arena = MemoryArena::default();
|
||||
let mut hash_map: SharedArenaHashMap = SharedArenaHashMap::default();
|
||||
hash_map.mutate_or_create(b"abc", &mut memory_arena, |opt_val: Option<u32>| {
|
||||
assert_eq!(opt_val, None);
|
||||
3u32
|
||||
});
|
||||
hash_map.mutate_or_create(b"abcd", &mut memory_arena, |opt_val: Option<u32>| {
|
||||
assert_eq!(opt_val, None);
|
||||
4u32
|
||||
});
|
||||
hash_map.mutate_or_create(b"abc", &mut memory_arena, |opt_val: Option<u32>| {
|
||||
assert_eq!(opt_val, Some(3u32));
|
||||
5u32
|
||||
});
|
||||
let mut vanilla_hash_map = HashMap::new();
|
||||
let iter_values = hash_map.iter(&memory_arena);
|
||||
for (key, addr) in iter_values {
|
||||
let val: u32 = memory_arena.read(addr);
|
||||
vanilla_hash_map.insert(key.to_owned(), val);
|
||||
}
|
||||
assert_eq!(vanilla_hash_map.len(), 2);
|
||||
}
|
||||
#[test]
|
||||
fn test_empty_hashmap() {
|
||||
let memory_arena = MemoryArena::default();
|
||||
let hash_map: SharedArenaHashMap = SharedArenaHashMap::default();
|
||||
assert_eq!(hash_map.get::<u32>(b"abc", &memory_arena), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compute_previous_power_of_two() {
|
||||
assert_eq!(compute_previous_power_of_two(8), 8);
|
||||
assert_eq!(compute_previous_power_of_two(9), 8);
|
||||
assert_eq!(compute_previous_power_of_two(7), 4);
|
||||
assert_eq!(compute_previous_power_of_two(u64::MAX as usize), 1 << 63);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_many_terms() {
|
||||
let mut memory_arena = MemoryArena::default();
|
||||
let mut terms: Vec<String> = (0..20_000).map(|val| val.to_string()).collect();
|
||||
let mut hash_map: SharedArenaHashMap = SharedArenaHashMap::default();
|
||||
for term in terms.iter() {
|
||||
hash_map.mutate_or_create(
|
||||
term.as_bytes(),
|
||||
&mut memory_arena,
|
||||
|_opt_val: Option<u32>| 5u32,
|
||||
);
|
||||
}
|
||||
let mut terms_back: Vec<String> = hash_map
|
||||
.iter(&memory_arena)
|
||||
.map(|(bytes, _)| String::from_utf8(bytes.to_vec()).unwrap())
|
||||
.collect();
|
||||
terms_back.sort();
|
||||
terms.sort();
|
||||
|
||||
for pos in 0..terms.len() {
|
||||
assert_eq!(terms[pos], terms_back[pos]);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user