From af9b7985fd2c533b0594207abde2d8c8fe1ae484 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 20 Apr 2026 12:18:13 +0200 Subject: [PATCH] Cache term_ord_to_str to avoid having too many, possibly redundant call to sorted_ords_to_term_cb. Closes #2892 --- benches/agg_bench.rs | 1 + src/aggregation/bucket/mod.rs | 1 + src/aggregation/bucket/term_agg.rs | 244 +++++++++++++++--- .../bucket/term_ord_to_str_cache.rs | 202 +++++++++++++++ 4 files changed, 413 insertions(+), 35 deletions(-) create mode 100644 src/aggregation/bucket/term_ord_to_str_cache.rs diff --git a/benches/agg_bench.rs b/benches/agg_bench.rs index 28009fa3f..17f83a8ce 100644 --- a/benches/agg_bench.rs +++ b/benches/agg_bench.rs @@ -206,6 +206,7 @@ fn terms_7(index: &Index) { }); execute_agg(index, agg_req); } + fn terms_all_unique(index: &Index) { let agg_req = json!({ "my_texts": { "terms": { "field": "text_all_unique_terms" } }, diff --git a/src/aggregation/bucket/mod.rs b/src/aggregation/bucket/mod.rs index 44c069463..64164c45e 100644 --- a/src/aggregation/bucket/mod.rs +++ b/src/aggregation/bucket/mod.rs @@ -28,6 +28,7 @@ mod histogram; mod range; mod term_agg; mod term_missing_agg; +mod term_ord_to_str_cache; use std::collections::HashMap; use std::fmt; diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 48a23d4ca..0d6e20ed0 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -7,9 +7,10 @@ use columnar::{ NumericalValue, StrColumn, }; use common::{BitSet, TinySet}; -use rustc_hash::FxHashMap; +use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet}; use serde::{Deserialize, Serialize}; +use super::term_ord_to_str_cache::{StringArena, StringRef, TermOrdToStrCache}; use super::{CustomOrder, Order, OrderTarget}; use crate::aggregation::agg_data::{ build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx, @@ -397,6 +398,7 @@ pub(crate) fn build_segment_term_collector( bucket_id_provider, max_term_id, terms_req_data, + term_ord_cache: None, }; Ok(Box::new(collector)) } else if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC { @@ -408,6 +410,7 @@ pub(crate) fn build_segment_term_collector( bucket_id_provider, max_term_id, terms_req_data, + term_ord_cache: None, }; Ok(Box::new(collector)) } else if max_term_id < 8_000_000 && is_top_level { @@ -422,6 +425,7 @@ pub(crate) fn build_segment_term_collector( bucket_id_provider, max_term_id, terms_req_data, + term_ord_cache: None, }; Ok(Box::new(collector)) } else { @@ -435,6 +439,7 @@ pub(crate) fn build_segment_term_collector( bucket_id_provider, max_term_id, terms_req_data, + term_ord_cache: None, }; Ok(Box::new(collector)) } @@ -470,6 +475,9 @@ trait TermAggregationMap: Clone + Debug + 'static { /// Returns the term aggregation as a vector of (term_id, bucket) pairs, /// in any order. fn into_vec(self) -> Vec<(u64, Bucket)>; + + /// Collects all term ordinals present in this map into the given set. + fn collect_term_ords(&self, set: &mut FxHashSet); } #[derive(Clone, Debug)] @@ -622,6 +630,20 @@ impl TermAggregationMap for PagedTermMap { Self { pages, mem_usage } } + + fn collect_term_ords(&self, set: &mut FxHashSet) { + for (page_idx, page_opt) in self.pages.iter().enumerate() { + if let Some(page) = page_opt { + let base_term_id = (page_idx << PAGE_SHIFT) as u64; + for (bucket_pos, &tiny_set) in page.presence.iter().enumerate() { + let base_offset = bucket_pos * 64; + for bit in tiny_set.into_iter() { + set.insert(base_term_id + (base_offset + bit as usize) as u64); + } + } + } + } + } } impl TermAggregationMap for HashMapTermBuckets { @@ -648,6 +670,10 @@ impl TermAggregationMap for HashMapTermBuckets { fn new(_max_term_id: u64, _bucket_id_provider: &mut BucketIdProvider) -> Self { Self::default() } + + fn collect_term_ords(&self, set: &mut FxHashSet) { + set.extend(self.bucket_map.keys().copied()); + } } /// An optimized term map implementation for a compact set of term ordinals. @@ -704,6 +730,14 @@ impl TermAggregationMap for VecTermBucketsNoAgg { .collect(), } } + + fn collect_term_ords(&self, set: &mut FxHashSet) { + for (term_id, &count) in self.buckets.iter().enumerate() { + if count > 0 { + set.insert(term_id as u64); + } + } + } } /// An optimized term map implementation for a compact set of term ordinals. @@ -753,6 +787,54 @@ impl TermAggregationMap for VecTermBuckets { .collect(), } } + + fn collect_term_ords(&self, set: &mut FxHashSet) { + for (term_id, bucket) in self.buckets.iter().enumerate() { + if bucket.count > 0 { + set.insert(term_id as u64); + } + } + } +} + +fn build_term_ord_cache( + parent_buckets: &[TermMap], + dictionary: &Dictionary, + term_req: &TermsAggReqData, +) -> std::io::Result { + let capacity: usize = parent_buckets.len() * 64; + let mut term_ords_set: FxHashSet = + FxHashSet::with_capacity_and_hasher(capacity, FxBuildHasher); + for bucket in parent_buckets.iter() { + bucket.collect_term_ords(&mut term_ords_set); + } + + if let Some(missing_sentinel) = term_req.missing_value_for_accessor { + term_ords_set.remove(&missing_sentinel); + } + + let mut term_ords: Vec = term_ords_set.into_iter().collect(); + term_ords.sort_unstable(); + + term_ords.pop_if(|highest_term_ord| *highest_term_ord >= dictionary.num_terms() as u64); + + let mut string_arena = StringArena::default(); + let mut string_refs: Vec = Vec::with_capacity(term_ords.len()); + let all_found: bool = dictionary.sorted_ords_to_term_cb(&term_ords, |term_bytes| { + let term_str = std::str::from_utf8(term_bytes).expect("could not convert to str"); + string_refs.push(string_arena.register_str(term_str)); + })?; + assert!(all_found); + + let missing_key: Option = + term_req.req.missing.as_ref().map(|missing_value| match missing_value { + Key::Str(s) => IntermediateKey::Str(s.clone()), + Key::F64(v) => IntermediateKey::F64(*v), + Key::U64(v) => IntermediateKey::U64(*v), + Key::I64(v) => IntermediateKey::I64(*v), + }); + + Ok(TermOrdToStrCache::new(term_ords, string_refs, string_arena, missing_key)) } /// The collector puts values from the fast field into the correct buckets and does a conversion to @@ -765,6 +847,7 @@ struct SegmentTermCollector { bucket_id_provider: BucketIdProvider, max_term_id: u64, terms_req_data: TermsAggReqData, + term_ord_cache: Option, } pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) { @@ -783,6 +866,17 @@ impl SegmentAggregationCollector ) -> crate::Result<()> { // TODO: avoid prepare_max_bucket here and handle empty buckets. self.prepare_max_bucket(bucket, agg_data)?; + + if self.terms_req_data.column_type == ColumnType::Str && self.term_ord_cache.is_none() { + if let Some(str_dict_column) = &self.terms_req_data.str_dict_column { + self.term_ord_cache = Some(build_term_ord_cache( + &self.parent_buckets, + str_dict_column.dictionary(), + &self.terms_req_data, + )?); + } + } + let bucket = std::mem::replace( &mut self.parent_buckets[bucket as usize], TermMap::new(0, &mut self.bucket_id_provider), @@ -797,6 +891,7 @@ impl SegmentAggregationCollector .map(BufferedSubAggs::get_sub_agg_collector), bucket, agg_data, + self.term_ord_cache.as_ref(), )?; results.push(name, IntermediateAggregationResult::Bucket(bucket))?; Ok(()) @@ -957,6 +1052,7 @@ where mut sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>, term_buckets: TermMap, agg_data: &AggregationsSegmentCtx, + term_ord_cache: Option<&TermOrdToStrCache>, ) -> crate::Result { let mut entries: Vec<(u64, Bucket)> = term_buckets.into_vec(); @@ -1005,43 +1101,77 @@ where .map(|el| el.dictionary()) .unwrap_or_else(|| &fallback_dict); - if let Some((intermediate_key, bucket)) = extract_missing_value(&mut entries, term_req) - { - let intermediate_entry = into_intermediate_bucket_entry( - bucket, - reborrow_opt_collector(&mut sub_agg_collector), - agg_data, - )?; - dict.insert(intermediate_key, intermediate_entry); - } + if let Some(cache) = term_ord_cache { + // Use cached term resolution: missing value is handled via the cache. + if let Some(missing_sentinel) = term_req.missing_value_for_accessor { + if let Some(pos) = entries.iter().position(|(tid, _)| *tid == missing_sentinel) + { + let (_tid, bucket) = entries.swap_remove(pos); + if let Some(missing_key) = cache.missing_key() { + let intermediate_entry = into_intermediate_bucket_entry( + bucket, + reborrow_opt_collector(&mut sub_agg_collector), + agg_data, + )?; + dict.insert(missing_key.clone(), intermediate_entry); + } + } + } - // Sort by term ord - entries.sort_unstable_by_key(|bucket| bucket.0); - - let (term_ids, buckets): (Vec, Vec) = entries.into_iter().unzip(); - - let intermediate_entries: Vec = buckets - .into_iter() - .map(|bucket| { - into_intermediate_bucket_entry( + for (term_ord, bucket) in entries { + if let Some(term_str) = cache.get(term_ord) { + let intermediate_entry = into_intermediate_bucket_entry( + bucket, + reborrow_opt_collector(&mut sub_agg_collector), + agg_data, + )?; + dict.insert( + IntermediateKey::Str(term_str.to_string()), + intermediate_entry, + ); + } + } + } else { + if let Some((intermediate_key, bucket)) = + extract_missing_value(&mut entries, term_req) + { + let intermediate_entry = into_intermediate_bucket_entry( bucket, reborrow_opt_collector(&mut sub_agg_collector), agg_data, - ) - }) - .collect::>()?; + )?; + dict.insert(intermediate_key, intermediate_entry); + } - let mut intermediate_entry_it = intermediate_entries.into_iter(); + // Sort by term ord + entries.sort_unstable_by_key(|bucket| bucket.0); - term_dict.sorted_ords_to_term_cb(&term_ids[..], |term| { - let intermediate_entry = intermediate_entry_it.next().unwrap(); - dict.insert( - IntermediateKey::Str( - String::from_utf8(term.to_vec()).expect("could not convert to String"), - ), - intermediate_entry, - ); - })?; + let (term_ids, buckets): (Vec, Vec) = entries.into_iter().unzip(); + + let intermediate_entries: Vec = buckets + .into_iter() + .map(|bucket| { + into_intermediate_bucket_entry( + bucket, + reborrow_opt_collector(&mut sub_agg_collector), + agg_data, + ) + }) + .collect::>()?; + + let mut intermediate_entry_it = intermediate_entries.into_iter(); + + term_dict.sorted_ords_to_term_cb(&term_ids[..], |term| { + let intermediate_entry = intermediate_entry_it.next().unwrap(); + dict.insert( + IntermediateKey::Str( + String::from_utf8(term.to_vec()) + .expect("could not convert to String"), + ), + intermediate_entry, + ); + })?; + } if term_req.req.min_doc_count == 0 { // TODO: Handle rev streaming for descending sorting by keys @@ -1162,13 +1292,13 @@ where impl SegmentTermCollector { #[inline] fn collect_terms_with_docs( - iter: impl Iterator, + doc_term_ord_iter: impl Iterator, term_buckets: &mut TermMap, bucket_id_provider: &mut BucketIdProvider, sub_agg: &mut BufferedSubAggs, ) { - for (doc, term_id) in iter { - let bucket_id = term_buckets.term_entry(term_id, bucket_id_provider); + for (doc, term_ord) in doc_term_ord_iter { + let bucket_id = term_buckets.term_entry(term_ord, bucket_id_provider); sub_agg.push(bucket_id, doc); } } @@ -2932,4 +3062,48 @@ mod tests { Ok(()) } + + #[test] + fn test_terms_double_nesting() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST); + let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST); + let index = Index::create_in_ram(schema_builder.build()); + + let outer_values = (0..10_000).map(|i| format!("outer_{i}")).collect::>(); + let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"]; + + { + let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 20_000_000)?; + for doc_id in 0..1_000_000u64 { + let outer_val = &outer_values[doc_id as usize % outer_values.len()]; + let inner_val = inner_values[doc_id as usize % inner_values.len()]; + index_writer.add_document(doc!( + outer_field => outer_val.as_str(), + inner_field => inner_val, + ))?; + } + index_writer.commit()?; + } + + let agg_req: Aggregations = serde_json::from_value(json!({ + "outer": { + "terms": { "field": "outer_term", "size": 200 }, + "aggs": { + "inner": { + "terms": { "field": "inner_term" } + } + } + } + })) + .unwrap(); + + let reader = index.reader()?; + let searcher = reader.searcher(); + let collector = + crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default()); + let _agg_result = searcher.search(&AllQuery, &collector)?; + + Ok(()) + } } diff --git a/src/aggregation/bucket/term_ord_to_str_cache.rs b/src/aggregation/bucket/term_ord_to_str_cache.rs new file mode 100644 index 000000000..6534f78b3 --- /dev/null +++ b/src/aggregation/bucket/term_ord_to_str_cache.rs @@ -0,0 +1,202 @@ +use rustc_hash::FxHashMap; + +use crate::aggregation::intermediate_agg_result::IntermediateKey; + +#[derive(Clone, Copy, Debug)] +pub(crate) struct StringRef { + start: u32, + len: u32, +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct StringArena { + buffer: String, +} + +impl StringArena { + pub fn register_str(&mut self, value: &str) -> StringRef { + let start = self.buffer.len() as u32; + self.buffer.push_str(value); + StringRef { + start, + len: value.len() as u32, + } + } + + pub fn get_str(&self, string_ref: StringRef) -> &str { + let start = string_ref.start as usize; + let end = start + string_ref.len as usize; + &self.buffer[start..end] + } + + pub fn len(&self) -> usize { + self.buffer.len() + } +} + +pub(crate) struct TermOrdToStrCache { + string_arena: StringArena, + missing_key: Option, + addr: TermOrdToAddr, +} + +enum TermOrdToAddr { + Dense { + offsets: Vec>, + }, + Sparse { + terms: FxHashMap, + }, +} + +impl std::fmt::Debug for TermOrdToStrCache { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match &self.addr { + TermOrdToAddr::Dense { offsets } => f + .debug_struct("TermOrdToStrCache::Dense") + .field("num_slots", &offsets.len()) + .field("arena_bytes", &self.string_arena.len()) + .finish(), + TermOrdToAddr::Sparse { terms } => f + .debug_struct("TermOrdToStrCache::Sparse") + .field("num_terms", &terms.len()) + .field("arena_bytes", &self.string_arena.len()) + .finish(), + } + } +} + +impl TermOrdToStrCache { + /// `term_ords` must be sorted. Each entry in `string_refs` is a reference + /// into `string_arena` for the corresponding term ord. + pub fn new( + term_ords: Vec, + string_refs: Vec, + string_arena: StringArena, + missing_key: Option, + ) -> TermOrdToStrCache { + let num_terms = term_ords.len(); + assert_eq!(num_terms, string_refs.len()); + + if term_ords.is_empty() { + return TermOrdToStrCache { + string_arena, + missing_key, + addr: TermOrdToAddr::Dense { + offsets: Vec::new(), + }, + }; + } + + let highest_term_ord = term_ords.last().copied().unwrap_or(0u64); + let should_use_dense = + highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64; + + let addr = if should_use_dense { + let num_slots = highest_term_ord as usize + 1; + let mut offsets: Vec> = vec![None; num_slots]; + for (term_ord, string_ref) in term_ords.into_iter().zip(string_refs) { + offsets[term_ord as usize] = Some(string_ref); + } + TermOrdToAddr::Dense { offsets } + } else { + let terms: FxHashMap = + term_ords.into_iter().zip(string_refs).collect(); + TermOrdToAddr::Sparse { terms } + }; + + TermOrdToStrCache { + string_arena, + missing_key, + addr, + } + } + + pub fn get(&self, term_ord: u64) -> Option<&str> { + let string_ref = match &self.addr { + TermOrdToAddr::Dense { offsets } => (*offsets.get(term_ord as usize)?)?, + TermOrdToAddr::Sparse { terms } => *terms.get(&term_ord)?, + }; + Some(self.string_arena.get_str(string_ref)) + } + + pub fn missing_key(&self) -> Option<&IntermediateKey> { + self.missing_key.as_ref() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_string_arena() { + let mut arena = StringArena::default(); + let r1 = arena.register_str("hello"); + let r2 = arena.register_str("world"); + let r3 = arena.register_str(""); + let r4 = arena.register_str("!"); + + assert_eq!(arena.get_str(r1), "hello"); + assert_eq!(arena.get_str(r2), "world"); + assert_eq!(arena.get_str(r3), ""); + assert_eq!(arena.get_str(r4), "!"); + } + + fn build_arena(terms: &[&str]) -> (StringArena, Vec) { + let mut arena = StringArena::default(); + let refs: Vec = terms.iter().map(|t| arena.register_str(t)).collect(); + (arena, refs) + } + + #[test] + fn test_dense_cache() { + let term_ords = vec![0, 2, 5]; + let (arena, refs) = build_arena(&["alpha", "beta", "gamma"]); + let cache = TermOrdToStrCache::new(term_ords, refs, arena, None); + + assert_eq!(cache.get(0), Some("alpha")); + assert_eq!(cache.get(1), None); + assert_eq!(cache.get(2), Some("beta")); + assert_eq!(cache.get(3), None); + assert_eq!(cache.get(5), Some("gamma")); + assert_eq!(cache.get(6), None); + assert_eq!(cache.get(100), None); + } + + #[test] + fn test_sparse_cache() { + let term_ords = vec![1_000_000, 2_000_000, 5_000_000]; + let (arena, refs) = build_arena(&["foo", "bar", "baz"]); + let cache = TermOrdToStrCache::new(term_ords, refs, arena, None); + + assert_eq!(cache.get(1_000_000), Some("foo")); + assert_eq!(cache.get(2_000_000), Some("bar")); + assert_eq!(cache.get(5_000_000), Some("baz")); + assert_eq!(cache.get(0), None); + assert_eq!(cache.get(3_000_000), None); + } + + #[test] + fn test_empty_cache() { + let cache = + TermOrdToStrCache::new(Vec::new(), Vec::new(), StringArena::default(), None); + assert_eq!(cache.get(0), None); + assert_eq!(cache.get(100), None); + } + + #[test] + fn test_missing_key() { + let missing = IntermediateKey::Str("N/A".to_string()); + let (arena, refs) = build_arena(&["x"]); + let cache = TermOrdToStrCache::new(vec![0], refs, arena, Some(missing)); + assert_eq!( + cache.missing_key(), + Some(&IntermediateKey::Str("N/A".to_string())) + ); + + let (arena, refs) = build_arena(&["x"]); + let cache_no_missing = TermOrdToStrCache::new(vec![0], refs, arena, None); + assert_eq!(cache_no_missing.missing_key(), None); + } +}