perf: various optimizations around arenas (#60)

- Use a bitset to track used buckets in the `SharedArenaHashmap`, allowing for more efficient iteration
- Create a global pool for both `MemoryArena` and `IndexingContext`
- Reduce the MemoryArea page size by half (it's now 512KB instead of 1MB)
- Centralize thread pool instances in `SegmentUpdater` so we can elide making them if all nthread sizes are zero
This commit is contained in:
Eric Ridge
2025-08-31 15:58:24 -04:00
committed by Stu Hood
parent b6cd39872b
commit 30c237e895
11 changed files with 322 additions and 112 deletions

58
Cargo.lock generated
View File

@@ -587,6 +587,12 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "fixedbitset"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "fnv"
version = "1.0.7"
@@ -890,6 +896,16 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413"
[[package]]
name = "lock_api"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.27"
@@ -1088,6 +1104,29 @@ dependencies = [
"stable_deref_trait",
]
[[package]]
name = "parking_lot"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets",
]
[[package]]
name = "paste"
version = "1.0.15"
@@ -1327,6 +1366,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.5.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77"
dependencies = [
"bitflags 2.9.0",
]
[[package]]
name = "regex"
version = "1.11.1"
@@ -1439,6 +1487,12 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.219"
@@ -1588,6 +1642,7 @@ dependencies = [
"more-asserts",
"once_cell",
"oneshot",
"parking_lot",
"paste",
"postcard",
"pretty_assertions",
@@ -1715,7 +1770,10 @@ version = "0.6.0"
dependencies = [
"ahash",
"binggan",
"fixedbitset",
"murmurhash32",
"once_cell",
"parking_lot",
"proptest",
"rand",
"rand_distr",

View File

@@ -73,6 +73,7 @@ hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"
parking_lot = "0.12.4"
typetag = "0.2.21"
[target.'cfg(windows)'.dependencies]
@@ -92,7 +93,7 @@ more-asserts = "0.3.1"
rand_distr = "0.4.3"
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }
postcard = { version = "1.0.4", features = [
"use-std",
"use-std",
], default-features = false }
[target.'cfg(not(windows))'.dev-dependencies]

View File

@@ -588,7 +588,7 @@ impl Index {
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads.max(1);
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)

View File

@@ -5,8 +5,9 @@ use std::io::Write;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use parking_lot::RwLock;
use rayon::{ThreadPool, ThreadPoolBuilder};
use super::segment_manager::SegmentManager;
@@ -317,6 +318,12 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
Ok(merged_index)
}
struct Pools {
pool: ThreadPool,
merge_thread_pool: ThreadPool,
merge_errors: Arc<RwLock<Vec<TantivyError>>>,
}
pub(crate) struct InnerSegmentUpdater {
// we keep a copy of the current active IndexMeta to
// avoid loading the file every time we need it in the
@@ -325,10 +332,7 @@ pub(crate) struct InnerSegmentUpdater {
// This should be up to date as all update happen through
// the unique active `SegmentUpdater`.
active_index_meta: RwLock<Arc<IndexMeta>>,
pool: ThreadPool,
merge_thread_pool: ThreadPool,
merge_errors: Arc<RwLock<Vec<TantivyError>>>,
pools: Option<Pools>,
index: Index,
segment_manager: SegmentManager,
merge_policy: RwLock<Arc<dyn MergePolicy>>,
@@ -348,40 +352,56 @@ impl SegmentUpdater {
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let mut builder = ThreadPoolBuilder::new()
.thread_name(|_| "segment_updater".to_string())
.num_threads(1);
if let Some(panic_handler) = panic_handler.as_ref() {
let panic_handler = panic_handler.clone();
builder = builder.panic_handler(move |any| {
panic_handler(any);
});
}
let pool = builder.build().map_err(|_| {
crate::TantivyError::SystemError("Failed to spawn segment updater thread".to_string())
})?;
let mut builder = ThreadPoolBuilder::new()
.thread_name(|i| format!("merge_thread_{i}"))
.num_threads(num_merge_threads);
if let Some(panic_handler) = panic_handler {
let panic_handler = panic_handler.clone();
builder = builder.panic_handler(move |any| {
panic_handler(any);
});
}
let merge_thread_pool = builder.build().map_err(|_| {
crate::TantivyError::SystemError("Failed to spawn segment merging thread".to_string())
})?;
let index_meta = index.load_metas()?;
Ok(SegmentUpdater {
inner: Arc::new(InnerSegmentUpdater {
active_index_meta: RwLock::new(Arc::new(index_meta)),
pool,
merge_thread_pool,
merge_errors: Default::default(),
pools: (num_merge_threads > 0).then(|| {
let mut builder = ThreadPoolBuilder::new()
.thread_name(|_| "segment_updater".to_string())
.num_threads(1);
if let Some(panic_handler) = panic_handler.as_ref() {
let panic_handler = panic_handler.clone();
builder = builder.panic_handler(move |any| {
panic_handler(any);
});
}
let pool = builder
.build()
.map_err(|_| {
crate::TantivyError::SystemError(
"Failed to spawn segment updater thread".to_string(),
)
})
.unwrap();
let mut builder = ThreadPoolBuilder::new()
.thread_name(|i| format!("merge_thread_{i}"))
.num_threads(num_merge_threads);
if let Some(panic_handler) = panic_handler {
let panic_handler = panic_handler.clone();
builder = builder.panic_handler(move |any| {
panic_handler(any);
});
}
let merge_thread_pool = builder
.build()
.map_err(|_| {
crate::TantivyError::SystemError(
"Failed to spawn segment merging thread".to_string(),
)
})
.unwrap();
Pools {
pool,
merge_thread_pool,
merge_errors: Default::default(),
}
}),
index,
segment_manager,
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
@@ -394,12 +414,12 @@ impl SegmentUpdater {
}
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
self.merge_policy.read().unwrap().clone()
self.merge_policy.read().clone()
}
pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
let arc_merge_policy = Arc::from(merge_policy);
*self.merge_policy.write().unwrap() = arc_merge_policy;
*self.merge_policy.write() = arc_merge_policy;
}
fn schedule_task<T: 'static + Send, F: FnOnce() -> crate::Result<T> + 'static + Send>(
@@ -412,10 +432,14 @@ impl SegmentUpdater {
let (scheduled_result, sender) = FutureResult::create(
"A segment_updater future did not succeed. This should never happen.",
);
self.pool.spawn(|| {
let task_result = task();
let _ = sender.send(task_result);
});
self.pools
.as_ref()
.expect("thread pools should have been configured")
.pool
.spawn(|| {
let task_result = task();
let _ = sender.send(task_result);
});
scheduled_result
}
@@ -538,11 +562,11 @@ impl SegmentUpdater {
}
fn store_meta(&self, index_meta: &IndexMeta) {
*self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone());
*self.active_index_meta.write() = Arc::new(index_meta.clone());
}
fn load_meta(&self) -> Arc<IndexMeta> {
self.active_index_meta.read().unwrap().clone()
self.active_index_meta.read().clone()
}
pub(crate) fn make_merge_operation(
@@ -605,38 +629,48 @@ impl SegmentUpdater {
FutureResult::create("Merge operation failed.");
let cancel = self.cancel.box_clone();
let merge_errors = self.merge_errors.clone();
self.merge_thread_pool.spawn(move || {
// The fact that `merge_operation` is moved here is important.
// Its lifetime is used to track how many merging thread are currently running,
// as well as which segment is currently in merge and therefore should not be
// candidate for another merge.
match merge(
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
cancel,
false,
) {
Ok(after_merge_segment_entry) => {
let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry);
let _send_result = merging_future_send.send(res);
}
Err(merge_error) => {
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
merge_error
);
if cfg!(test) {
panic!("{merge_error:?}");
let merge_errors = self
.pools
.as_ref()
.expect("thread pools should have been configured")
.merge_errors
.clone();
self.pools
.as_ref()
.expect("thread pools should have been configured")
.merge_thread_pool
.spawn(move || {
// The fact that `merge_operation` is moved here is important.
// Its lifetime is used to track how many merging thread are currently running,
// as well as which segment is currently in merge and therefore should not be
// candidate for another merge.
match merge(
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
cancel,
false,
) {
Ok(after_merge_segment_entry) => {
let res =
segment_updater.end_merge(merge_operation, after_merge_segment_entry);
let _send_result = merging_future_send.send(res);
}
Err(merge_error) => {
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
merge_error
);
if cfg!(test) {
panic!("{merge_error:?}");
}
merge_errors.write().unwrap().push(merge_error.clone());
let _send_result = merging_future_send.send(Err(merge_error));
merge_errors.write().push(merge_error.clone());
let _send_result = merging_future_send.send(Err(merge_error));
}
}
}
});
});
scheduled_result
}
@@ -679,7 +713,11 @@ impl SegmentUpdater {
}
pub(crate) fn get_merge_errors(&self) -> Vec<TantivyError> {
self.merge_errors.read().unwrap().clone()
if let Some(pools) = self.pools.as_ref() {
pools.merge_errors.read().clone()
} else {
Vec::new()
}
}
fn consider_merge_options(&self) {

View File

@@ -1,7 +1,13 @@
use std::cell::RefCell;
use stacker::{ArenaHashMap, MemoryArena};
use crate::indexer::path_to_unordered_id::PathToUnorderedId;
thread_local! {
static CONTEXT_POOL: RefCell<Vec<IndexingContext>> = RefCell::new(Vec::new());
}
/// IndexingContext contains all of the transient memory arenas
/// required for building the inverted index.
pub(crate) struct IndexingContext {
@@ -13,9 +19,27 @@ pub(crate) struct IndexingContext {
pub path_to_unordered_id: PathToUnorderedId,
}
impl Default for IndexingContext {
fn default() -> Self {
Self::create(1)
}
}
impl IndexingContext {
/// Create a new IndexingContext given the size of the term hash map.
/// Gets an IndexingContext from the pool or creates a new one
pub(crate) fn new(table_size: usize) -> IndexingContext {
CONTEXT_POOL
.with(|pool| pool.borrow_mut().pop())
.unwrap_or_else(|| Self::create(table_size))
}
/// Returns the memory usage for the inverted index memory arenas, in bytes.
pub(crate) fn mem_usage(&self) -> usize {
self.term_index.mem_usage() + self.arena.mem_usage()
}
/// Create a new IndexingContext given the size of the term hash map.
fn create(table_size: usize) -> IndexingContext {
let term_index = ArenaHashMap::with_capacity(table_size);
IndexingContext {
arena: MemoryArena::default(),
@@ -24,8 +48,12 @@ impl IndexingContext {
}
}
/// Returns the memory usage for the inverted index memory arenas, in bytes.
pub(crate) fn mem_usage(&self) -> usize {
self.term_index.mem_usage() + self.arena.mem_usage()
pub fn checkin(mut ctx: IndexingContext) {
CONTEXT_POOL.with(|pool| {
ctx.term_index.reset();
ctx.arena.reset();
ctx.path_to_unordered_id = PathToUnorderedId::default();
pool.borrow_mut().push(ctx);
});
}
}

View File

@@ -91,6 +91,8 @@ pub(crate) fn serialize_postings(
field_serializer.close()?;
}
IndexingContext::checkin(ctx);
Ok(())
}

View File

@@ -12,7 +12,9 @@ murmurhash32 = "0.3"
common = { version = "0.10", path = "../common/", package = "tantivy-common" }
ahash = { version = "0.8.11", default-features = false, optional = true }
rand_distr = "0.4.3"
fixedbitset = "0.5.7"
once_cell = "1.21.3"
parking_lot = "0.12.4"
[[bench]]
harness = false

View File

@@ -35,6 +35,11 @@ impl ArenaHashMap {
}
}
pub fn reset(&mut self) {
self.shared_arena_hashmap.reset();
self.memory_arena.reset();
}
#[inline]
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
self.memory_arena.read(addr)

View File

@@ -22,10 +22,18 @@
//!
//! Instead, you store and access your data via `.write(...)` and `.read(...)`, which under the hood
//! stores your object using `ptr::write_unaligned` and `ptr::read_unaligned`.
use std::sync::Arc;
use std::{mem, ptr};
const NUM_BITS_PAGE_ADDR: usize = 20;
const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 1 MB large
use once_cell::sync::Lazy;
use parking_lot::Mutex;
// Tanty's default is 20 bits, or 1MB. Tantivy uses the memory arena during indexing and generally
// assumes long-running indexing threads. We (pg_search) do indexing on the main thread and our
// indexing patterns are typically 1 document per segment. :( Using half tantivy's default memory
// saves quite a bit of indexing overhead
const NUM_BITS_PAGE_ADDR: usize = 19;
const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 512k large
/// Represents a pointer into the `MemoryArena`
/// .
@@ -91,18 +99,45 @@ pub fn load<Item: Copy + 'static>(data: &[u8]) -> Item {
/// The `MemoryArena`
pub struct MemoryArena {
pages: Vec<Page>,
capacity: usize,
}
static ARENA_POOL: Lazy<Arc<Mutex<Vec<MemoryArena>>>> = Lazy::new(|| Default::default());
impl Default for MemoryArena {
fn default() -> MemoryArena {
let first_page = Page::new(0);
MemoryArena {
pages: vec![first_page],
}
ARENA_POOL.lock().pop().unwrap_or_else(|| {
let first_page = Page::new(0);
MemoryArena {
pages: vec![first_page],
capacity: 1,
}
})
}
}
impl Drop for MemoryArena {
fn drop(&mut self) {
let my_pages = std::mem::replace(&mut self.pages, Vec::new());
let my_capacity = self.capacity;
let mut recycled = MemoryArena {
pages: my_pages,
capacity: my_capacity,
};
recycled.reset();
ARENA_POOL.lock().push(recycled);
}
}
impl MemoryArena {
pub fn reset(&mut self) {
unsafe {
self.pages.set_len(1);
}
self.pages[0].len = 0;
}
/// Returns an estimate in number of bytes
/// of resident memory consumed by the `MemoryArena`.
///
@@ -171,11 +206,27 @@ impl MemoryArena {
/// Add a page and allocate len on it.
/// Return the address
fn add_page(&mut self, len: usize) -> Addr {
let new_page_id = self.pages.len();
let mut page = Page::new(new_page_id);
page.len = len;
self.pages.push(page);
Addr::new(new_page_id, 0)
let npages = self.pages.len();
if npages + 1 < self.capacity {
// we have a hidden, pre-allocated page
unsafe {
// make it live
self.pages.set_len(npages + 1);
let page = self.pages.get_unchecked_mut(npages);
page.len = len;
Addr::new(page.page_id, 0)
}
} else {
// must allocate a new page and add it to the arena
let new_page_id = self.pages.len();
let mut page = Page::new(new_page_id);
page.len = len;
self.pages.push(page);
self.capacity += 1;
Addr::new(new_page_id, 0)
}
}
/// Allocates `len` bytes and returns the allocated address.

View File

@@ -1,6 +1,7 @@
use std::iter::{Cloned, Filter};
use std::mem;
use fixedbitset::{FixedBitSet, Ones};
use super::{Addr, MemoryArena};
use crate::fastcpy::fast_short_slice_copy;
use crate::memory_arena::store;
@@ -41,7 +42,9 @@ impl KeyValue {
fn is_empty(&self) -> bool {
self.key_value_addr.is_null()
}
#[inline]
#[allow(dead_code)]
fn is_not_empty_ref(&self) -> bool {
!self.key_value_addr.is_null()
}
@@ -62,6 +65,7 @@ impl KeyValue {
/// So one MemoryArena can be shared with multiple SharedArenaHashMap.
pub struct SharedArenaHashMap {
table: Vec<KeyValue>,
used: FixedBitSet,
mask: usize,
len: usize,
}
@@ -88,24 +92,26 @@ impl LinearProbing {
}
}
type IterNonEmpty<'a> = Filter<Cloned<std::slice::Iter<'a, KeyValue>>, fn(&KeyValue) -> bool>;
pub struct Iter<'a> {
used: Ones<'a>,
hashmap: &'a SharedArenaHashMap,
memory_arena: &'a MemoryArena,
inner: IterNonEmpty<'a>,
}
impl<'a> Iterator for Iter<'a> {
type Item = (&'a [u8], Addr);
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(move |kv| {
let (key, offset): (&'a [u8], Addr) = self
.hashmap
.get_key_value(kv.key_value_addr, self.memory_arena);
(key, offset)
})
let next = self.used.next()?;
let kv = unsafe { self.hashmap.table.get_unchecked(next) };
Some(
self.hashmap
.get_key_value(kv.key_value_addr, self.memory_arena),
)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.used.size_hint()
}
}
@@ -132,11 +138,19 @@ impl SharedArenaHashMap {
SharedArenaHashMap {
table,
used: FixedBitSet::with_capacity(table_size_power_of_2),
mask: table_size_power_of_2 - 1,
len: 0,
}
}
pub fn reset(&mut self) {
// NB: thanks to `self.used` we don't need to reset the contents of the table
// self.table.fill(KeyValue::default());
self.used.clear();
self.len = 0;
}
#[inline]
#[cfg(not(feature = "compare_hash_only"))]
fn get_hash(&self, key: &[u8]) -> HashType {
@@ -220,6 +234,9 @@ impl SharedArenaHashMap {
key_value_addr,
hash,
};
unsafe {
self.used.set_unchecked(bucket, true);
}
}
#[inline]
@@ -235,11 +252,7 @@ impl SharedArenaHashMap {
#[inline]
pub fn iter<'a>(&'a self, memory_arena: &'a MemoryArena) -> Iter<'a> {
Iter {
inner: self
.table
.iter()
.cloned()
.filter(KeyValue::is_not_empty_ref),
used: self.used.ones(),
hashmap: self,
memory_arena,
}
@@ -249,14 +262,21 @@ impl SharedArenaHashMap {
let new_len = (self.table.len() * 2).max(1 << 3);
let mask = new_len - 1;
self.mask = mask;
let new_table = vec![KeyValue::default(); new_len];
let old_table = mem::replace(&mut self.table, new_table);
for key_value in old_table.into_iter().filter(KeyValue::is_not_empty_ref) {
// assign a new table and used bitset, taking the old ones so we can use them for resizing
let old_table = mem::replace(&mut self.table, vec![KeyValue::default(); new_len]);
let old_used = mem::replace(&mut self.used, FixedBitSet::with_capacity(new_len));
for idx in old_used.ones() {
let key_value = unsafe { old_table.get_unchecked(idx) };
let mut probe = LinearProbing::compute(key_value.hash, mask);
loop {
let bucket = probe.next_probe();
if self.table[bucket].is_empty() {
self.table[bucket] = key_value;
self.table[bucket] = *key_value;
unsafe {
self.used.set_unchecked(bucket, true);
}
break;
}
}
@@ -272,7 +292,7 @@ impl SharedArenaHashMap {
loop {
let bucket = probe.next_probe();
let kv: KeyValue = self.table[bucket];
if kv.is_empty() {
if !self.used[bucket] {
return None;
} else if kv.hash == hash
&& let Some(val_addr) =
@@ -315,8 +335,9 @@ impl SharedArenaHashMap {
let mut probe = self.probe(hash);
let mut bucket = probe.next_probe();
let mut kv: KeyValue = self.table[bucket];
let mut used = self.used[bucket];
loop {
if kv.is_empty() {
if !used {
// The key does not exist yet.
let val = updater(None);
let num_bytes = std::mem::size_of::<u16>() + key.len() + std::mem::size_of::<V>();
@@ -345,6 +366,7 @@ impl SharedArenaHashMap {
// This allows fetching the next bucket before the loop jmp
bucket = probe.next_probe();
kv = self.table[bucket];
used = self.used[bucket];
}
}
}

View File

@@ -96,7 +96,10 @@ fn test_fail_on_flush_segment_but_one_worker_remains() -> tantivy::Result<()> {
let index = Index::create_in_ram(schema_builder.build());
let index_writer: IndexWriter = index.writer_with_num_threads(2, 30_000_000)?;
fail::cfg("FieldSerializer::close_term", "1*return(simulatederror)").unwrap();
for i in 0..100_000 {
// this number is dependent on MemoryArena::NUM_BITS_PAGE_ADDR. When set to 19 (512k), ~200k
// iterations are necessary to trigger the expected error
for i in 0..200_000 {
if index_writer
.add_document(doc!(text_field => format!("hellohappytaxpayerlongtokenblabla{}", i)))
.is_err()