From 030554d544b0ab5d53a96d3b4a33effccbea35da Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 4 Dec 2025 14:47:55 +0800 Subject: [PATCH] use radix map, fix prepare_max_bucket use paged term map in term agg use special no sub agg term map impl --- src/aggregation/bucket/term_agg.rs | 266 +++++++++++++++++++++++++++-- src/aggregation/cached_sub_aggs.rs | 25 ++- 2 files changed, 263 insertions(+), 28 deletions(-) diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index e0180e268..32633a8f9 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -375,7 +375,7 @@ pub(crate) fn build_segment_term_collector( // TODO: A better metric instead of is_top_level would be the number of buckets expected. // E.g. If term agg is not top level, but the parent is a bucket agg with less than 10 buckets, // we can still use Vec. - let can_use_vec = terms_req_data.is_top_level; + let is_top_level = terms_req_data.is_top_level; // TODO: Benchmark to validate the threshold const MAX_NUM_TERMS_FOR_VEC: usize = 100; @@ -394,14 +394,39 @@ pub(crate) fn build_segment_term_collector( let mut bucket_id_provider = BucketIdProvider::default(); // - use a Vec instead of a hashmap for our aggregation. - if can_use_vec && max_term < MAX_NUM_TERMS_FOR_VEC { - let term_buckets = VecTermBuckets::new(max_term + 1, &mut bucket_id_provider); + + if is_top_level && max_term < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations { + let term_buckets = VecTermBucketsNoAgg::new(max_term as u64 + 1, &mut bucket_id_provider); + let collector: SegmentTermCollector<_, true> = SegmentTermCollector { + buckets: vec![term_buckets], + accessor_idx, + sub_agg: None, + bucket_id_provider, + max_term_id: max_term as u64, + }; + Ok(Box::new(collector)) + } else if is_top_level && max_term < MAX_NUM_TERMS_FOR_VEC { + let term_buckets = VecTermBuckets::new(max_term as u64 + 1, &mut bucket_id_provider); let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); let collector: SegmentTermCollector<_, true> = SegmentTermCollector { buckets: vec![term_buckets], accessor_idx, sub_agg, bucket_id_provider, + max_term_id: max_term as u64, + }; + Ok(Box::new(collector)) + } else if max_term < 8_000_000 && is_top_level { + let term_buckets: PagedTermMap = + PagedTermMap::new(max_term as u64 + 1, &mut bucket_id_provider); + // Build sub-aggregation blueprint (flat pairs) + let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); + let collector: SegmentTermCollector = SegmentTermCollector { + buckets: vec![term_buckets], + accessor_idx, + sub_agg, + bucket_id_provider, + max_term_id: max_term as u64, }; Ok(Box::new(collector)) } else { @@ -413,12 +438,13 @@ pub(crate) fn build_segment_term_collector( accessor_idx, sub_agg, bucket_id_provider, + max_term_id: max_term as u64, }; Ok(Box::new(collector)) } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy, Default)] struct Bucket { pub count: u32, pub bucket_id: BucketId, @@ -435,7 +461,10 @@ impl Bucket { } /// Abstraction over the storage used for term buckets (counts only). -trait TermAggregationMap: Clone + Debug + Default + 'static { +trait TermAggregationMap: Clone + Debug + 'static { + /// Create a new instance with a strict upper bound on term ids. + fn new(max_term_id: u64, bucket_id_provider: &mut BucketIdProvider) -> Self; + /// Estimate the memory consumption of this struct in bytes. fn get_memory_consumption(&self) -> usize; @@ -461,6 +490,149 @@ impl Default for HashMapTermBuckets { } } +const PAGE_SHIFT: usize = 10; +const PAGE_SIZE: usize = 1 << PAGE_SHIFT; // 1024 +const PAGE_MASK: usize = PAGE_SIZE - 1; +const BITMASK_LEN: usize = PAGE_SIZE / 64; + +#[derive(Clone, Debug)] +struct Page { + presence: [u64; BITMASK_LEN], + data: [Bucket; PAGE_SIZE], +} + +impl Page { + fn new() -> Self { + Self { + presence: [0; BITMASK_LEN], + data: [Bucket::default(); PAGE_SIZE], + } + } + + #[inline] + fn is_set(&self, offset: usize) -> bool { + let word_idx = offset / 64; + let bit_idx = offset % 64; + (self.presence[word_idx] & (1 << bit_idx)) != 0 + } + + #[inline] + fn set_present(&mut self, offset: usize) { + let word_idx = offset / 64; + let bit_idx = offset % 64; + self.presence[word_idx] |= 1 << bit_idx; + } + + // Flattened iteration logic + fn collect_items(&self, base_term_id: u64, result: &mut Vec<(u64, Bucket)>) { + for (word_idx, &word) in self.presence.iter().enumerate() { + if word == 0 { + continue; + } + + let mut temp_word = word; + let base_offset = word_idx * 64; + + while temp_word != 0 { + let bit = temp_word.trailing_zeros() as usize; + let offset = base_offset + bit; + result.push((base_term_id + offset as u64, self.data[offset])); + temp_word &= !(1 << bit); + } + } + } +} + +/// A paged term map implementation for moderate sized term id sets. +/// Uses a fixed size vector of pages, each page containing a fixed size array of buckets. +/// +/// Each page covers a range of term ids. Pages are allocated on demand. +/// This implementation is more memory efficient than a full Vec for sparse term id sets, +/// +/// It has a fixed cost of `num_pages * 8 bytes` for the page directory. +/// For 1 million terms, this is 8 * 1024 = 8KB. +/// +/// Note that for nested aggregations we create one TermAggregationMap per parent bucket. +/// For example, with 100 parent buckets and 1 million terms, this is 800KB overhead for the page +/// directories only. Therefore, this implementation is only enabled for top-level aggregations +/// TODO: pass expected number of buckets from parent instead of strict is_top_level flag. +#[derive(Clone, Debug, Default)] +struct PagedTermMap { + // Fixed size vector based on max_term_id + pages: Vec>>, + mem_usage: usize, +} + +impl PagedTermMap {} + +impl TermAggregationMap for PagedTermMap { + #[inline] + fn get_memory_consumption(&self) -> usize { + self.mem_usage + std::mem::size_of::() + } + + #[inline] + fn term_entry(&mut self, term_id: u64, bucket_id_provider: &mut BucketIdProvider) -> BucketId { + let term_id = term_id as usize; + let page_idx = term_id >> PAGE_SHIFT; + let offset = term_id & PAGE_MASK; + + // This panics if term_id > max_term_id + let page = match &mut self.pages[page_idx] { + Some(p) => p, + None => { + let new_page = Box::new(Page::new()); + self.mem_usage += std::mem::size_of::(); + self.pages[page_idx] = Some(new_page); + self.pages[page_idx].as_mut().unwrap() + } + }; + + if page.is_set(offset) { + let bucket = &mut page.data[offset]; + bucket.count += 1; + bucket.bucket_id + } else { + let new_id = bucket_id_provider.next_bucket_id(); + page.data[offset] = Bucket { + count: 1, + bucket_id: new_id, + }; + page.set_present(offset); + new_id + } + } + + fn into_vec(self) -> Vec<(u64, Bucket)> { + // Heuristic: Estimate active count. + let estimated_count = (self.mem_usage / std::mem::size_of::()) * (PAGE_SIZE / 2); + let mut result = Vec::with_capacity(estimated_count); + + for (i, page_opt) in self.pages.into_iter().enumerate() { + if let Some(page) = page_opt { + let base_term_id = (i << PAGE_SHIFT) as u64; + page.collect_items(base_term_id, &mut result); + } + } + result + } + + /// Initialize with a strict upper bound. + /// Panics if you try to insert a term_id > max_term_id. + fn new(max_term_id: u64, _bucket_id_provider: &mut BucketIdProvider) -> Self { + let max_page_idx = (max_term_id as usize) >> PAGE_SHIFT; + let num_pages = max_page_idx + 1; + + // Pre-allocate the directory (pointers only, not the heavy pages) + // Memory cost: num_pages * 8 bytes + let pages = vec![None; num_pages]; + + let mem_usage = pages.capacity() * std::mem::size_of::>>(); + + Self { pages, mem_usage } + } +} + impl TermAggregationMap for HashMapTermBuckets { #[inline] fn get_memory_consumption(&self) -> usize { @@ -480,24 +652,75 @@ impl TermAggregationMap for HashMapTermBuckets { fn into_vec(self) -> Vec<(u64, Bucket)> { self.bucket_map.into_iter().collect() } + + #[inline] + fn new(_max_term_id: u64, _bucket_id_provider: &mut BucketIdProvider) -> Self { + Self::default() + } } /// An optimized term map implementation for a compact set of term ordinals. -#[derive(Clone, Debug, Default)] -struct VecTermBuckets { - buckets: Vec, +#[derive(Clone, Debug)] +struct VecTermBucketsNoAgg { + buckets: Vec, } -impl VecTermBuckets { - fn new(num_terms: usize, bucket_id_provider: &mut BucketIdProvider) -> Self { - VecTermBuckets { - buckets: std::iter::repeat_with(|| Bucket::new(bucket_id_provider.next_bucket_id())) - .take(num_terms) +impl TermAggregationMap for VecTermBucketsNoAgg { + /// Estimate the memory consumption of this struct in bytes. + fn get_memory_consumption(&self) -> usize { + // We do not include `std::mem::size_of::()` + // It is already measure by the parent aggregation. + // + self.buckets.capacity() * std::mem::size_of::() + } + + /// Add an occurrence of the given term id. + #[inline(always)] + fn term_entry(&mut self, term_id: u64, _bucket_id_provider: &mut BucketIdProvider) -> BucketId { + let term_id_usize = term_id as usize; + debug_assert!( + term_id_usize < self.buckets.len(), + "term_id {} out of bounds for VecTermBuckets (len={})", + term_id, + self.buckets.len() + ); + let count = unsafe { self.buckets.get_unchecked_mut(term_id_usize) }; + *count += 1; + 0 // unused + } + + fn into_vec(self) -> Vec<(u64, Bucket)> { + self.buckets + .into_iter() + .enumerate() + .filter(|(_term_id, count)| *count > 0) + .map(|(term_id, count)| { + ( + term_id as u64, + Bucket { + count, + bucket_id: 0, // unused, there are no sub-aggregations + }, + ) + }) + .collect() + } + + fn new(num_terms: u64, _bucket_id_provider: &mut BucketIdProvider) -> Self { + Self { + buckets: std::iter::repeat_with(|| 0) + .take(num_terms as usize) .collect(), } } } +/// An optimized term map implementation for a compact set of term ordinals. +#[derive(Clone, Debug)] +struct VecTermBuckets { + buckets: Vec, +} + impl TermAggregationMap for VecTermBuckets { /// Estimate the memory consumption of this struct in bytes. fn get_memory_consumption(&self) -> usize { @@ -531,6 +754,14 @@ impl TermAggregationMap for VecTermBuckets { .map(|(term_id, bucket)| (term_id as u64, bucket)) .collect() } + + fn new(num_terms: u64, bucket_id_provider: &mut BucketIdProvider) -> Self { + VecTermBuckets { + buckets: std::iter::repeat_with(|| Bucket::new(bucket_id_provider.next_bucket_id())) + .take(num_terms as usize) + .collect(), + } + } } /// The collector puts values from the fast field into the correct buckets and does a conversion to @@ -542,6 +773,7 @@ struct SegmentTermCollector>, accessor_idx: usize, bucket_id_provider: BucketIdProvider, + max_term_id: u64, } pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) { @@ -560,7 +792,10 @@ impl SegmentAggregationCollect ) -> crate::Result<()> { // TODO: avoid prepare_max_bucket here and handle empty buckets. self.prepare_max_bucket(bucket, agg_data)?; - let bucket = std::mem::take(&mut self.buckets[bucket as usize]); + let bucket = std::mem::replace( + &mut self.buckets[bucket as usize], + TermMap::new(0, &mut self.bucket_id_provider), + ); let term_req = agg_data.get_term_req_data(self.accessor_idx); let name = term_req.name.clone(); @@ -655,7 +890,8 @@ impl SegmentAggregationCollect _agg_data: &AggregationsSegmentCtx, ) -> crate::Result<()> { while self.buckets.len() <= max_bucket as usize { - let term_buckets: TermMap = TermMap::default(); + let term_buckets: TermMap = + TermMap::new(self.max_term_id, &mut self.bucket_id_provider); self.buckets.push(term_buckets); } Ok(()) diff --git a/src/aggregation/cached_sub_aggs.rs b/src/aggregation/cached_sub_aggs.rs index 7e6f57803..e430bfd4c 100644 --- a/src/aggregation/cached_sub_aggs.rs +++ b/src/aggregation/cached_sub_aggs.rs @@ -81,19 +81,6 @@ impl CachedSubAggs { self.num_docs += 1; } - #[inline] - pub fn extend_with_bucket_zero(&mut self, docs: &[DocId]) { - debug_assert!( - LOWCARD, - "extend_with_bucket_zero only valid for single bucket" - ); - if self.per_bucket_docs.is_empty() { - self.per_bucket_docs.resize_with(1, Vec::new); - } - self.per_bucket_docs[0].extend_from_slice(docs); - self.num_docs += docs.len(); - } - /// Check if we need to flush based on the number of documents cached. /// If so, flushes the cache to the provided aggregation collector. pub fn check_flush_local( @@ -158,6 +145,18 @@ impl CachedSubAggs { } } +impl CachedSubAggs { + /// Implemented Only for low cardinality cached sub-aggregations. + #[inline] + pub fn extend_with_bucket_zero(&mut self, docs: &[DocId]) { + if self.per_bucket_docs.is_empty() { + self.per_bucket_docs.resize_with(1, Vec::new); + } + self.per_bucket_docs[0].extend_from_slice(docs); + self.num_docs += docs.len(); + } +} + #[derive(Debug, Clone)] struct PartitionEntry { bucket_ids: Vec,