diff --git a/Cargo.lock b/Cargo.lock index f67fd530d..eb498ebe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -587,6 +587,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "fnv" version = "1.0.7" @@ -890,6 +896,16 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413" +[[package]] +name = "lock_api" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.27" @@ -1088,6 +1104,29 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "parking_lot" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "paste" version = "1.0.15" @@ -1327,6 +1366,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77" +dependencies = [ + "bitflags 2.9.0", +] + [[package]] name = "regex" version = "1.11.1" @@ -1439,6 +1487,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.219" @@ -1588,6 +1642,7 @@ dependencies = [ "more-asserts", "once_cell", "oneshot", + "parking_lot", "paste", "postcard", "pretty_assertions", @@ -1715,7 +1770,10 @@ version = "0.6.0" dependencies = [ "ahash", "binggan", + "fixedbitset", "murmurhash32", + "once_cell", + "parking_lot", "proptest", "rand", "rand_distr", diff --git a/Cargo.toml b/Cargo.toml index 57a971238..01009ba4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ hyperloglogplus = { version = "0.4.1", features = ["const-loop"] } futures-util = { version = "0.3.28", optional = true } futures-channel = { version = "0.3.28", optional = true } fnv = "1.0.7" +parking_lot = "0.12.4" typetag = "0.2.21" [target.'cfg(windows)'.dependencies] @@ -92,7 +93,7 @@ more-asserts = "0.3.1" rand_distr = "0.4.3" time = { version = "0.3.10", features = ["serde-well-known", "macros"] } postcard = { version = "1.0.4", features = [ - "use-std", + "use-std", ], default-features = false } [target.'cfg(not(windows))'.dev-dependencies] diff --git a/src/index/index.rs b/src/index/index.rs index 0b20b019d..fce33f8a2 100644 --- a/src/index/index.rs +++ b/src/index/index.rs @@ -588,7 +588,7 @@ impl Index { num_threads: usize, overall_memory_budget_in_bytes: usize, ) -> crate::Result> { - let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads; + let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads.max(1); let options = IndexWriterOptions::builder() .num_worker_threads(num_threads) .memory_budget_per_thread(memory_arena_in_bytes_per_thread) diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 18e142108..aec3abc4e 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -5,8 +5,9 @@ use std::io::Write; use std::ops::Deref; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; +use parking_lot::RwLock; use rayon::{ThreadPool, ThreadPoolBuilder}; use super::segment_manager::SegmentManager; @@ -317,6 +318,12 @@ pub fn merge_filtered_segments>>( Ok(merged_index) } +struct Pools { + pool: ThreadPool, + merge_thread_pool: ThreadPool, + merge_errors: Arc>>, +} + pub(crate) struct InnerSegmentUpdater { // we keep a copy of the current active IndexMeta to // avoid loading the file every time we need it in the @@ -325,10 +332,7 @@ pub(crate) struct InnerSegmentUpdater { // This should be up to date as all update happen through // the unique active `SegmentUpdater`. active_index_meta: RwLock>, - pool: ThreadPool, - merge_thread_pool: ThreadPool, - merge_errors: Arc>>, - + pools: Option, index: Index, segment_manager: SegmentManager, merge_policy: RwLock>, @@ -348,40 +352,56 @@ impl SegmentUpdater { ) -> crate::Result { let segments = index.searchable_segment_metas()?; let segment_manager = SegmentManager::from_segments(segments, delete_cursor); - let mut builder = ThreadPoolBuilder::new() - .thread_name(|_| "segment_updater".to_string()) - .num_threads(1); - if let Some(panic_handler) = panic_handler.as_ref() { - let panic_handler = panic_handler.clone(); - builder = builder.panic_handler(move |any| { - panic_handler(any); - }); - } - - let pool = builder.build().map_err(|_| { - crate::TantivyError::SystemError("Failed to spawn segment updater thread".to_string()) - })?; - let mut builder = ThreadPoolBuilder::new() - .thread_name(|i| format!("merge_thread_{i}")) - .num_threads(num_merge_threads); - if let Some(panic_handler) = panic_handler { - let panic_handler = panic_handler.clone(); - builder = builder.panic_handler(move |any| { - panic_handler(any); - }); - } - - let merge_thread_pool = builder.build().map_err(|_| { - crate::TantivyError::SystemError("Failed to spawn segment merging thread".to_string()) - })?; let index_meta = index.load_metas()?; Ok(SegmentUpdater { inner: Arc::new(InnerSegmentUpdater { active_index_meta: RwLock::new(Arc::new(index_meta)), - pool, - merge_thread_pool, - merge_errors: Default::default(), + pools: (num_merge_threads > 0).then(|| { + let mut builder = ThreadPoolBuilder::new() + .thread_name(|_| "segment_updater".to_string()) + .num_threads(1); + + if let Some(panic_handler) = panic_handler.as_ref() { + let panic_handler = panic_handler.clone(); + builder = builder.panic_handler(move |any| { + panic_handler(any); + }); + } + + let pool = builder + .build() + .map_err(|_| { + crate::TantivyError::SystemError( + "Failed to spawn segment updater thread".to_string(), + ) + }) + .unwrap(); + + let mut builder = ThreadPoolBuilder::new() + .thread_name(|i| format!("merge_thread_{i}")) + .num_threads(num_merge_threads); + if let Some(panic_handler) = panic_handler { + let panic_handler = panic_handler.clone(); + builder = builder.panic_handler(move |any| { + panic_handler(any); + }); + } + let merge_thread_pool = builder + .build() + .map_err(|_| { + crate::TantivyError::SystemError( + "Failed to spawn segment merging thread".to_string(), + ) + }) + .unwrap(); + + Pools { + pool, + merge_thread_pool, + merge_errors: Default::default(), + } + }), index, segment_manager, merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())), @@ -394,12 +414,12 @@ impl SegmentUpdater { } pub fn get_merge_policy(&self) -> Arc { - self.merge_policy.read().unwrap().clone() + self.merge_policy.read().clone() } pub fn set_merge_policy(&self, merge_policy: Box) { let arc_merge_policy = Arc::from(merge_policy); - *self.merge_policy.write().unwrap() = arc_merge_policy; + *self.merge_policy.write() = arc_merge_policy; } fn schedule_task crate::Result + 'static + Send>( @@ -412,10 +432,14 @@ impl SegmentUpdater { let (scheduled_result, sender) = FutureResult::create( "A segment_updater future did not succeed. This should never happen.", ); - self.pool.spawn(|| { - let task_result = task(); - let _ = sender.send(task_result); - }); + self.pools + .as_ref() + .expect("thread pools should have been configured") + .pool + .spawn(|| { + let task_result = task(); + let _ = sender.send(task_result); + }); scheduled_result } @@ -538,11 +562,11 @@ impl SegmentUpdater { } fn store_meta(&self, index_meta: &IndexMeta) { - *self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone()); + *self.active_index_meta.write() = Arc::new(index_meta.clone()); } fn load_meta(&self) -> Arc { - self.active_index_meta.read().unwrap().clone() + self.active_index_meta.read().clone() } pub(crate) fn make_merge_operation( @@ -605,38 +629,48 @@ impl SegmentUpdater { FutureResult::create("Merge operation failed."); let cancel = self.cancel.box_clone(); - let merge_errors = self.merge_errors.clone(); - self.merge_thread_pool.spawn(move || { - // The fact that `merge_operation` is moved here is important. - // Its lifetime is used to track how many merging thread are currently running, - // as well as which segment is currently in merge and therefore should not be - // candidate for another merge. - match merge( - &segment_updater.index, - segment_entries, - merge_operation.target_opstamp(), - cancel, - false, - ) { - Ok(after_merge_segment_entry) => { - let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry); - let _send_result = merging_future_send.send(res); - } - Err(merge_error) => { - warn!( - "Merge of {:?} was cancelled: {:?}", - merge_operation.segment_ids().to_vec(), - merge_error - ); - if cfg!(test) { - panic!("{merge_error:?}"); + let merge_errors = self + .pools + .as_ref() + .expect("thread pools should have been configured") + .merge_errors + .clone(); + self.pools + .as_ref() + .expect("thread pools should have been configured") + .merge_thread_pool + .spawn(move || { + // The fact that `merge_operation` is moved here is important. + // Its lifetime is used to track how many merging thread are currently running, + // as well as which segment is currently in merge and therefore should not be + // candidate for another merge. + match merge( + &segment_updater.index, + segment_entries, + merge_operation.target_opstamp(), + cancel, + false, + ) { + Ok(after_merge_segment_entry) => { + let res = + segment_updater.end_merge(merge_operation, after_merge_segment_entry); + let _send_result = merging_future_send.send(res); } + Err(merge_error) => { + warn!( + "Merge of {:?} was cancelled: {:?}", + merge_operation.segment_ids().to_vec(), + merge_error + ); + if cfg!(test) { + panic!("{merge_error:?}"); + } - merge_errors.write().unwrap().push(merge_error.clone()); - let _send_result = merging_future_send.send(Err(merge_error)); + merge_errors.write().push(merge_error.clone()); + let _send_result = merging_future_send.send(Err(merge_error)); + } } - } - }); + }); scheduled_result } @@ -679,7 +713,11 @@ impl SegmentUpdater { } pub(crate) fn get_merge_errors(&self) -> Vec { - self.merge_errors.read().unwrap().clone() + if let Some(pools) = self.pools.as_ref() { + pools.merge_errors.read().clone() + } else { + Vec::new() + } } fn consider_merge_options(&self) { diff --git a/src/postings/indexing_context.rs b/src/postings/indexing_context.rs index 2675476f3..e67adb0a7 100644 --- a/src/postings/indexing_context.rs +++ b/src/postings/indexing_context.rs @@ -1,7 +1,13 @@ +use std::cell::RefCell; + use stacker::{ArenaHashMap, MemoryArena}; use crate::indexer::path_to_unordered_id::PathToUnorderedId; +thread_local! { + static CONTEXT_POOL: RefCell> = RefCell::new(Vec::new()); +} + /// IndexingContext contains all of the transient memory arenas /// required for building the inverted index. pub(crate) struct IndexingContext { @@ -13,9 +19,27 @@ pub(crate) struct IndexingContext { pub path_to_unordered_id: PathToUnorderedId, } +impl Default for IndexingContext { + fn default() -> Self { + Self::create(1) + } +} + impl IndexingContext { - /// Create a new IndexingContext given the size of the term hash map. + /// Gets an IndexingContext from the pool or creates a new one pub(crate) fn new(table_size: usize) -> IndexingContext { + CONTEXT_POOL + .with(|pool| pool.borrow_mut().pop()) + .unwrap_or_else(|| Self::create(table_size)) + } + + /// Returns the memory usage for the inverted index memory arenas, in bytes. + pub(crate) fn mem_usage(&self) -> usize { + self.term_index.mem_usage() + self.arena.mem_usage() + } + + /// Create a new IndexingContext given the size of the term hash map. + fn create(table_size: usize) -> IndexingContext { let term_index = ArenaHashMap::with_capacity(table_size); IndexingContext { arena: MemoryArena::default(), @@ -24,8 +48,12 @@ impl IndexingContext { } } - /// Returns the memory usage for the inverted index memory arenas, in bytes. - pub(crate) fn mem_usage(&self) -> usize { - self.term_index.mem_usage() + self.arena.mem_usage() + pub fn checkin(mut ctx: IndexingContext) { + CONTEXT_POOL.with(|pool| { + ctx.term_index.reset(); + ctx.arena.reset(); + ctx.path_to_unordered_id = PathToUnorderedId::default(); + pool.borrow_mut().push(ctx); + }); } } diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index c7a94ecef..26ff20af7 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -91,6 +91,8 @@ pub(crate) fn serialize_postings( field_serializer.close()?; } + IndexingContext::checkin(ctx); + Ok(()) } diff --git a/stacker/Cargo.toml b/stacker/Cargo.toml index c78c23051..782fbf434 100644 --- a/stacker/Cargo.toml +++ b/stacker/Cargo.toml @@ -12,7 +12,9 @@ murmurhash32 = "0.3" common = { version = "0.10", path = "../common/", package = "tantivy-common" } ahash = { version = "0.8.11", default-features = false, optional = true } rand_distr = "0.4.3" - +fixedbitset = "0.5.7" +once_cell = "1.21.3" +parking_lot = "0.12.4" [[bench]] harness = false diff --git a/stacker/src/arena_hashmap.rs b/stacker/src/arena_hashmap.rs index 9f3dfbbc5..173ebda11 100644 --- a/stacker/src/arena_hashmap.rs +++ b/stacker/src/arena_hashmap.rs @@ -35,6 +35,11 @@ impl ArenaHashMap { } } + pub fn reset(&mut self) { + self.shared_arena_hashmap.reset(); + self.memory_arena.reset(); + } + #[inline] pub fn read(&self, addr: Addr) -> Item { self.memory_arena.read(addr) diff --git a/stacker/src/memory_arena.rs b/stacker/src/memory_arena.rs index 383b96438..16fb91b37 100644 --- a/stacker/src/memory_arena.rs +++ b/stacker/src/memory_arena.rs @@ -22,10 +22,18 @@ //! //! Instead, you store and access your data via `.write(...)` and `.read(...)`, which under the hood //! stores your object using `ptr::write_unaligned` and `ptr::read_unaligned`. +use std::sync::Arc; use std::{mem, ptr}; -const NUM_BITS_PAGE_ADDR: usize = 20; -const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 1 MB large +use once_cell::sync::Lazy; +use parking_lot::Mutex; + +// Tanty's default is 20 bits, or 1MB. Tantivy uses the memory arena during indexing and generally +// assumes long-running indexing threads. We (pg_search) do indexing on the main thread and our +// indexing patterns are typically 1 document per segment. :( Using half tantivy's default memory +// saves quite a bit of indexing overhead +const NUM_BITS_PAGE_ADDR: usize = 19; +const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 512k large /// Represents a pointer into the `MemoryArena` /// . @@ -91,18 +99,45 @@ pub fn load(data: &[u8]) -> Item { /// The `MemoryArena` pub struct MemoryArena { pages: Vec, + capacity: usize, } +static ARENA_POOL: Lazy>>> = Lazy::new(|| Default::default()); + impl Default for MemoryArena { fn default() -> MemoryArena { - let first_page = Page::new(0); - MemoryArena { - pages: vec![first_page], - } + ARENA_POOL.lock().pop().unwrap_or_else(|| { + let first_page = Page::new(0); + MemoryArena { + pages: vec![first_page], + capacity: 1, + } + }) + } +} + +impl Drop for MemoryArena { + fn drop(&mut self) { + let my_pages = std::mem::replace(&mut self.pages, Vec::new()); + let my_capacity = self.capacity; + let mut recycled = MemoryArena { + pages: my_pages, + capacity: my_capacity, + }; + + recycled.reset(); + ARENA_POOL.lock().push(recycled); } } impl MemoryArena { + pub fn reset(&mut self) { + unsafe { + self.pages.set_len(1); + } + self.pages[0].len = 0; + } + /// Returns an estimate in number of bytes /// of resident memory consumed by the `MemoryArena`. /// @@ -171,11 +206,27 @@ impl MemoryArena { /// Add a page and allocate len on it. /// Return the address fn add_page(&mut self, len: usize) -> Addr { - let new_page_id = self.pages.len(); - let mut page = Page::new(new_page_id); - page.len = len; - self.pages.push(page); - Addr::new(new_page_id, 0) + let npages = self.pages.len(); + + if npages + 1 < self.capacity { + // we have a hidden, pre-allocated page + + unsafe { + // make it live + self.pages.set_len(npages + 1); + let page = self.pages.get_unchecked_mut(npages); + page.len = len; + Addr::new(page.page_id, 0) + } + } else { + // must allocate a new page and add it to the arena + let new_page_id = self.pages.len(); + let mut page = Page::new(new_page_id); + page.len = len; + self.pages.push(page); + self.capacity += 1; + Addr::new(new_page_id, 0) + } } /// Allocates `len` bytes and returns the allocated address. diff --git a/stacker/src/shared_arena_hashmap.rs b/stacker/src/shared_arena_hashmap.rs index 6964272f0..1c4e3aea6 100644 --- a/stacker/src/shared_arena_hashmap.rs +++ b/stacker/src/shared_arena_hashmap.rs @@ -1,6 +1,7 @@ -use std::iter::{Cloned, Filter}; use std::mem; +use fixedbitset::{FixedBitSet, Ones}; + use super::{Addr, MemoryArena}; use crate::fastcpy::fast_short_slice_copy; use crate::memory_arena::store; @@ -41,7 +42,9 @@ impl KeyValue { fn is_empty(&self) -> bool { self.key_value_addr.is_null() } + #[inline] + #[allow(dead_code)] fn is_not_empty_ref(&self) -> bool { !self.key_value_addr.is_null() } @@ -62,6 +65,7 @@ impl KeyValue { /// So one MemoryArena can be shared with multiple SharedArenaHashMap. pub struct SharedArenaHashMap { table: Vec, + used: FixedBitSet, mask: usize, len: usize, } @@ -88,24 +92,26 @@ impl LinearProbing { } } -type IterNonEmpty<'a> = Filter>, fn(&KeyValue) -> bool>; - pub struct Iter<'a> { + used: Ones<'a>, hashmap: &'a SharedArenaHashMap, memory_arena: &'a MemoryArena, - inner: IterNonEmpty<'a>, } impl<'a> Iterator for Iter<'a> { type Item = (&'a [u8], Addr); fn next(&mut self) -> Option { - self.inner.next().map(move |kv| { - let (key, offset): (&'a [u8], Addr) = self - .hashmap - .get_key_value(kv.key_value_addr, self.memory_arena); - (key, offset) - }) + let next = self.used.next()?; + let kv = unsafe { self.hashmap.table.get_unchecked(next) }; + Some( + self.hashmap + .get_key_value(kv.key_value_addr, self.memory_arena), + ) + } + + fn size_hint(&self) -> (usize, Option) { + self.used.size_hint() } } @@ -132,11 +138,19 @@ impl SharedArenaHashMap { SharedArenaHashMap { table, + used: FixedBitSet::with_capacity(table_size_power_of_2), mask: table_size_power_of_2 - 1, len: 0, } } + pub fn reset(&mut self) { + // NB: thanks to `self.used` we don't need to reset the contents of the table + // self.table.fill(KeyValue::default()); + self.used.clear(); + self.len = 0; + } + #[inline] #[cfg(not(feature = "compare_hash_only"))] fn get_hash(&self, key: &[u8]) -> HashType { @@ -220,6 +234,9 @@ impl SharedArenaHashMap { key_value_addr, hash, }; + unsafe { + self.used.set_unchecked(bucket, true); + } } #[inline] @@ -235,11 +252,7 @@ impl SharedArenaHashMap { #[inline] pub fn iter<'a>(&'a self, memory_arena: &'a MemoryArena) -> Iter<'a> { Iter { - inner: self - .table - .iter() - .cloned() - .filter(KeyValue::is_not_empty_ref), + used: self.used.ones(), hashmap: self, memory_arena, } @@ -249,14 +262,21 @@ impl SharedArenaHashMap { let new_len = (self.table.len() * 2).max(1 << 3); let mask = new_len - 1; self.mask = mask; - let new_table = vec![KeyValue::default(); new_len]; - let old_table = mem::replace(&mut self.table, new_table); - for key_value in old_table.into_iter().filter(KeyValue::is_not_empty_ref) { + + // assign a new table and used bitset, taking the old ones so we can use them for resizing + let old_table = mem::replace(&mut self.table, vec![KeyValue::default(); new_len]); + let old_used = mem::replace(&mut self.used, FixedBitSet::with_capacity(new_len)); + + for idx in old_used.ones() { + let key_value = unsafe { old_table.get_unchecked(idx) }; let mut probe = LinearProbing::compute(key_value.hash, mask); loop { let bucket = probe.next_probe(); if self.table[bucket].is_empty() { - self.table[bucket] = key_value; + self.table[bucket] = *key_value; + unsafe { + self.used.set_unchecked(bucket, true); + } break; } } @@ -272,7 +292,7 @@ impl SharedArenaHashMap { loop { let bucket = probe.next_probe(); let kv: KeyValue = self.table[bucket]; - if kv.is_empty() { + if !self.used[bucket] { return None; } else if kv.hash == hash && let Some(val_addr) = @@ -315,8 +335,9 @@ impl SharedArenaHashMap { let mut probe = self.probe(hash); let mut bucket = probe.next_probe(); let mut kv: KeyValue = self.table[bucket]; + let mut used = self.used[bucket]; loop { - if kv.is_empty() { + if !used { // The key does not exist yet. let val = updater(None); let num_bytes = std::mem::size_of::() + key.len() + std::mem::size_of::(); @@ -345,6 +366,7 @@ impl SharedArenaHashMap { // This allows fetching the next bucket before the loop jmp bucket = probe.next_probe(); kv = self.table[bucket]; + used = self.used[bucket]; } } } diff --git a/tests/failpoints/mod.rs b/tests/failpoints/mod.rs index 213c86628..3fe10e578 100644 --- a/tests/failpoints/mod.rs +++ b/tests/failpoints/mod.rs @@ -96,7 +96,10 @@ fn test_fail_on_flush_segment_but_one_worker_remains() -> tantivy::Result<()> { let index = Index::create_in_ram(schema_builder.build()); let index_writer: IndexWriter = index.writer_with_num_threads(2, 30_000_000)?; fail::cfg("FieldSerializer::close_term", "1*return(simulatederror)").unwrap(); - for i in 0..100_000 { + + // this number is dependent on MemoryArena::NUM_BITS_PAGE_ADDR. When set to 19 (512k), ~200k + // iterations are necessary to trigger the expected error + for i in 0..200_000 { if index_writer .add_document(doc!(text_field => format!("hellohappytaxpayerlongtokenblabla{}", i))) .is_err()