From 8d3a7abbe90292baa2ebb4cf55208aaeec149bb3 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 5 Jan 2026 09:23:56 +0100 Subject: [PATCH] split subaggcache into two trait impls --- src/aggregation/agg_data.rs | 11 +- src/aggregation/bucket/filter.rs | 37 ++- src/aggregation/bucket/histogram/histogram.rs | 6 +- src/aggregation/bucket/range.rs | 24 +- src/aggregation/bucket/term_agg.rs | 66 ++-- src/aggregation/bucket/term_missing_agg.rs | 4 +- src/aggregation/cached_sub_aggs.rs | 286 +++++++++++------- src/aggregation/collector.rs | 7 +- 8 files changed, 265 insertions(+), 176 deletions(-) diff --git a/src/aggregation/agg_data.rs b/src/aggregation/agg_data.rs index fd97e365b..de521505e 100644 --- a/src/aggregation/agg_data.rs +++ b/src/aggregation/agg_data.rs @@ -10,9 +10,9 @@ use crate::aggregation::accessor_helpers::{ }; use crate::aggregation::agg_req::{Aggregation, AggregationVariants, Aggregations}; use crate::aggregation::bucket::{ - build_segment_range_collector, FilterAggReqData, HistogramAggReqData, HistogramBounds, - IncludeExcludeParam, MissingTermAggReqData, RangeAggReqData, SegmentFilterCollector, - SegmentHistogramCollector, TermMissingAgg, TermsAggReqData, TermsAggregation, + build_segment_filter_collector, build_segment_range_collector, FilterAggReqData, + HistogramAggReqData, HistogramBounds, IncludeExcludeParam, MissingTermAggReqData, + RangeAggReqData, SegmentHistogramCollector, TermMissingAgg, TermsAggReqData, TermsAggregation, TermsAggregationInternal, }; use crate::aggregation::metric::{ @@ -416,9 +416,7 @@ pub(crate) fn build_segment_agg_collector( req, node, )?)), AggKind::Range => Ok(build_segment_range_collector(req, node)?), - AggKind::Filter => Ok(Box::new(SegmentFilterCollector::from_req_and_validate( - req, node, - )?)), + AggKind::Filter => build_segment_filter_collector(req, node), } } @@ -733,6 +731,7 @@ fn build_nodes( segment_reader: reader.clone(), evaluator, matching_docs_buffer, + is_top_level, }); let children = build_children(&req.sub_aggregation, reader, segment_ordinal, data)?; Ok(vec![AggRefNode { diff --git a/src/aggregation/bucket/filter.rs b/src/aggregation/bucket/filter.rs index d511172f9..73518238a 100644 --- a/src/aggregation/bucket/filter.rs +++ b/src/aggregation/bucket/filter.rs @@ -6,7 +6,9 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::aggregation::agg_data::{ build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx, }; -use crate::aggregation::cached_sub_aggs::CachedSubAggs; +use crate::aggregation::cached_sub_aggs::{ + CachedSubAggs, HighCardSubAggCache, LowCardSubAggCache, SubAggCache, +}; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, }; @@ -406,6 +408,8 @@ pub struct FilterAggReqData { pub evaluator: DocumentQueryEvaluator, /// Reusable buffer for matching documents to minimize allocations during collection pub matching_docs_buffer: Vec, + /// True if this filter aggregation is at the top level of the aggregation tree (not nested). + pub is_top_level: bool, } impl FilterAggReqData { @@ -415,6 +419,7 @@ impl FilterAggReqData { + std::mem::size_of::() + self.evaluator.bitset.len() / 8 // BitSet memory (bits to bytes) + self.matching_docs_buffer.capacity() * std::mem::size_of::() + + std::mem::size_of::() } } @@ -498,17 +503,17 @@ struct DocCount { } /// Segment collector for filter aggregation -pub struct SegmentFilterCollector { +pub struct SegmentFilterCollector { /// Document counts per parent bucket parent_buckets: Vec, /// Sub-aggregation collectors - sub_aggregations: Option>, + sub_aggregations: Option>, bucket_id_provider: BucketIdProvider, /// Accessor index for this filter aggregation (to access FilterAggReqData) accessor_idx: usize, } -impl SegmentFilterCollector { +impl SegmentFilterCollector { /// Create a new filter segment collector following the new agg_data pattern pub(crate) fn from_req_and_validate( req: &mut AggregationsSegmentCtx, @@ -531,7 +536,27 @@ impl SegmentFilterCollector { } } -impl Debug for SegmentFilterCollector { +pub(crate) fn build_segment_filter_collector( + req: &mut AggregationsSegmentCtx, + node: &AggRefNode, +) -> crate::Result> { + let is_top_level = req.per_request.filter_req_data[node.idx_in_req_data] + .as_ref() + .expect("filter_req_data slot is empty") + .is_top_level; + + if is_top_level { + Ok(Box::new( + SegmentFilterCollector::::from_req_and_validate(req, node)?, + )) + } else { + Ok(Box::new( + SegmentFilterCollector::::from_req_and_validate(req, node)?, + )) + } +} + +impl Debug for SegmentFilterCollector { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SegmentFilterCollector") .field("buckets", &self.parent_buckets) @@ -541,7 +566,7 @@ impl Debug for SegmentFilterCollector { } } -impl SegmentAggregationCollector for SegmentFilterCollector { +impl SegmentAggregationCollector for SegmentFilterCollector { fn add_intermediate_aggregation_result( &mut self, agg_data: &AggregationsSegmentCtx, diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 6e8198582..adf7936c6 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -10,7 +10,7 @@ use crate::aggregation::agg_data::{ }; use crate::aggregation::agg_req::Aggregations; use crate::aggregation::agg_result::BucketEntry; -use crate::aggregation::cached_sub_aggs::CachedSubAggs; +use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs}; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, @@ -258,7 +258,7 @@ pub(crate) struct SegmentHistogramBucketEntry { impl SegmentHistogramBucketEntry { pub(crate) fn into_intermediate_bucket_entry( self, - sub_aggregation: &mut Option, + sub_aggregation: &mut Option, agg_data: &AggregationsSegmentCtx, ) -> crate::Result { let mut sub_aggregation_res = IntermediateAggregationResults::default(); @@ -291,7 +291,7 @@ pub struct SegmentHistogramCollector { /// The buckets containing the aggregation data. /// One Histogram bucket per parent bucket id. parent_buckets: Vec, - sub_agg: Option, + sub_agg: Option, accessor_idx: usize, bucket_id_provider: BucketIdProvider, } diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 682c007a1..46e0065ce 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -9,7 +9,9 @@ use crate::aggregation::agg_data::{ build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx, }; use crate::aggregation::agg_limits::AggregationLimitsGuard; -use crate::aggregation::cached_sub_aggs::CachedSubAggs; +use crate::aggregation::cached_sub_aggs::{ + CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache, +}; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateRangeBucketEntry, IntermediateRangeBucketResult, @@ -153,13 +155,13 @@ pub(crate) struct SegmentRangeAndBucketEntry { /// The collector puts values from the fast field into the correct buckets and does a conversion to /// the correct datatype. -pub struct SegmentRangeCollector { +pub struct SegmentRangeCollector { /// The buckets containing the aggregation data. /// One for each ParentBucketId parent_buckets: Vec>, column_type: ColumnType, pub(crate) accessor_idx: usize, - sub_agg: Option>, + sub_agg: Option>, /// Here things get a bit weird. We need to assign unique bucket ids across all /// parent buckets. So we keep track of the next available bucket id here. /// This allows a kind of flattening of the bucket ids across all parent buckets. @@ -176,7 +178,7 @@ pub struct SegmentRangeCollector { limits: AggregationLimitsGuard, } -impl Debug for SegmentRangeCollector { +impl Debug for SegmentRangeCollector { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SegmentRangeCollector") .field("parent_buckets_len", &self.parent_buckets.len()) @@ -227,7 +229,7 @@ impl SegmentRangeBucketEntry { } } -impl SegmentAggregationCollector for SegmentRangeCollector { +impl SegmentAggregationCollector for SegmentRangeCollector { fn add_intermediate_aggregation_result( &mut self, agg_data: &AggregationsSegmentCtx, @@ -348,8 +350,8 @@ pub(crate) fn build_segment_range_collector( }; if is_low_card { - Ok(Box::new(SegmentRangeCollector { - sub_agg: sub_agg.map(CachedSubAggs::::new), + Ok(Box::new(SegmentRangeCollector:: { + sub_agg: sub_agg.map(LowCardCachedSubAggs::new), column_type: field_type, accessor_idx, parent_buckets: Vec::new(), @@ -357,8 +359,8 @@ pub(crate) fn build_segment_range_collector( limits: agg_data.context.limits.clone(), })) } else { - Ok(Box::new(SegmentRangeCollector { - sub_agg: sub_agg.map(CachedSubAggs::::new), + Ok(Box::new(SegmentRangeCollector:: { + sub_agg: sub_agg.map(CachedSubAggs::new), column_type: field_type, accessor_idx, parent_buckets: Vec::new(), @@ -368,7 +370,7 @@ pub(crate) fn build_segment_range_collector( } } -impl SegmentRangeCollector { +impl SegmentRangeCollector { pub(crate) fn create_new_buckets( &mut self, agg_data: &AggregationsSegmentCtx, @@ -552,7 +554,7 @@ mod tests { pub fn get_collector_from_ranges( ranges: Vec, field_type: ColumnType, - ) -> SegmentRangeCollector { + ) -> SegmentRangeCollector { let req = RangeAggregation { field: "dummy".to_string(), ranges, diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 5cf39746c..ed2793bd1 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -17,7 +17,9 @@ use crate::aggregation::agg_data::{ }; use crate::aggregation::agg_limits::MemoryConsumption; use crate::aggregation::agg_req::Aggregations; -use crate::aggregation::cached_sub_aggs::CachedSubAggs; +use crate::aggregation::cached_sub_aggs::{ + CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache, +}; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateKey, IntermediateTermBucketEntry, IntermediateTermBucketResult, @@ -389,7 +391,7 @@ pub(crate) fn build_segment_term_collector( // Decide which bucket storage is best suited for this aggregation. if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations { let term_buckets = VecTermBucketsNoAgg::new(max_term_id + 1, &mut bucket_id_provider); - let collector: SegmentTermCollector<_, true> = SegmentTermCollector { + let collector: SegmentTermCollector<_, HighCardSubAggCache> = SegmentTermCollector { parent_buckets: vec![term_buckets], sub_agg: None, bucket_id_provider, @@ -399,8 +401,8 @@ pub(crate) fn build_segment_term_collector( Ok(Box::new(collector)) } else if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC { let term_buckets = VecTermBuckets::new(max_term_id + 1, &mut bucket_id_provider); - let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); - let collector: SegmentTermCollector<_, false> = SegmentTermCollector { + let sub_agg = sub_agg_collector.map(LowCardCachedSubAggs::new); + let collector: SegmentTermCollector<_, LowCardSubAggCache> = SegmentTermCollector { parent_buckets: vec![term_buckets], sub_agg, bucket_id_provider, @@ -412,26 +414,28 @@ pub(crate) fn build_segment_term_collector( let term_buckets: PagedTermMap = PagedTermMap::new(max_term_id + 1, &mut bucket_id_provider); // Build sub-aggregation blueprint (flat pairs) - let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); - let collector: SegmentTermCollector = SegmentTermCollector { - parent_buckets: vec![term_buckets], - sub_agg, - bucket_id_provider, - max_term_id, - terms_req_data, - }; + let sub_agg = sub_agg_collector.map(CachedSubAggs::new); + let collector: SegmentTermCollector = + SegmentTermCollector { + parent_buckets: vec![term_buckets], + sub_agg, + bucket_id_provider, + max_term_id, + terms_req_data, + }; Ok(Box::new(collector)) } else { let term_buckets: HashMapTermBuckets = HashMapTermBuckets::default(); // Build sub-aggregation blueprint (flat pairs) - let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); - let collector: SegmentTermCollector = SegmentTermCollector { - parent_buckets: vec![term_buckets], - sub_agg, - bucket_id_provider, - max_term_id, - terms_req_data, - }; + let sub_agg = sub_agg_collector.map(CachedSubAggs::new); + let collector: SegmentTermCollector = + SegmentTermCollector { + parent_buckets: vec![term_buckets], + sub_agg, + bucket_id_provider, + max_term_id, + terms_req_data, + }; Ok(Box::new(collector)) } } @@ -754,10 +758,10 @@ impl TermAggregationMap for VecTermBuckets { /// The collector puts values from the fast field into the correct buckets and does a conversion to /// the correct datatype. #[derive(Debug)] -struct SegmentTermCollector { +struct SegmentTermCollector { /// The buckets containing the aggregation data. parent_buckets: Vec, - sub_agg: Option>, + sub_agg: Option>, bucket_id_provider: BucketIdProvider, max_term_id: u64, terms_req_data: TermsAggReqData, @@ -768,8 +772,8 @@ pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) { (agg_name, agg_property) } -impl SegmentAggregationCollector - for SegmentTermCollector +impl SegmentAggregationCollector + for SegmentTermCollector { fn add_intermediate_aggregation_result( &mut self, @@ -901,8 +905,10 @@ fn extract_missing_value( Some((key, bucket)) } -impl SegmentTermCollector -where TermMap: TermAggregationMap +impl SegmentTermCollector +where + TermMap: TermAggregationMap, + C: SubAggCache, { fn get_memory_consumption(&self) -> usize { self.parent_buckets @@ -914,7 +920,7 @@ where TermMap: TermAggregationMap #[inline] pub(crate) fn into_intermediate_bucket_result( term_req: &TermsAggReqData, - sub_agg: &mut Option>, + sub_agg: &mut Option>, term_buckets: TermMap, agg_data: &AggregationsSegmentCtx, ) -> crate::Result { @@ -959,7 +965,7 @@ where TermMap: TermAggregationMap let into_intermediate_bucket_entry = |bucket: Bucket, - sub_agg: &mut Option>| + sub_agg: &mut Option>| -> crate::Result { if let Some(sub_agg) = sub_agg { let mut sub_aggregation_res = IntermediateAggregationResults::default(); @@ -1115,13 +1121,13 @@ where TermMap: TermAggregationMap } } -impl SegmentTermCollector { +impl SegmentTermCollector { #[inline] fn collect_terms_with_docs( iter: impl Iterator, term_buckets: &mut TermMap, bucket_id_provider: &mut BucketIdProvider, - sub_agg: &mut CachedSubAggs, + sub_agg: &mut CachedSubAggs, ) { for (doc, term_id) in iter { let bucket_id = term_buckets.term_entry(term_id, bucket_id_provider); diff --git a/src/aggregation/bucket/term_missing_agg.rs b/src/aggregation/bucket/term_missing_agg.rs index 6545fa2f3..47c3989c6 100644 --- a/src/aggregation/bucket/term_missing_agg.rs +++ b/src/aggregation/bucket/term_missing_agg.rs @@ -5,7 +5,7 @@ use crate::aggregation::agg_data::{ build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx, }; use crate::aggregation::bucket::term_agg::TermsAggregation; -use crate::aggregation::cached_sub_aggs::CachedSubAggs; +use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs}; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateKey, IntermediateTermBucketEntry, IntermediateTermBucketResult, @@ -47,7 +47,7 @@ struct MissingCount { #[derive(Default, Debug)] pub struct TermMissingAgg { accessor_idx: usize, - sub_agg: Option, + sub_agg: Option, /// Idx = parent bucket id, Value = missing count for that bucket missing_count_per_bucket: Vec, bucket_id_provider: BucketIdProvider, diff --git a/src/aggregation/cached_sub_aggs.rs b/src/aggregation/cached_sub_aggs.rs index 94ea93fea..dff74fc83 100644 --- a/src/aggregation/cached_sub_aggs.rs +++ b/src/aggregation/cached_sub_aggs.rs @@ -1,10 +1,11 @@ +use std::fmt::Debug; + use super::segment_agg_result::SegmentAggregationCollector; use crate::aggregation::agg_data::AggregationsSegmentCtx; use crate::aggregation::bucket::MAX_NUM_TERMS_FOR_VEC; use crate::aggregation::BucketId; use crate::DocId; -#[derive(Debug)] /// A cache for sub-aggregations, storing doc ids per bucket id. /// Depending on the cardinality of the parent aggregation, we use different /// storage strategies. @@ -22,68 +23,49 @@ use crate::DocId; /// TODO: consider using a more advanced data structure for high cardinality /// aggregations. /// What this datastructure does in general is to group docs by bucket id. -pub(crate) struct CachedSubAggs { - /// Only used when LOWCARD is true. - /// Cache doc ids per bucket for sub-aggregations. - /// - /// The outer Vec is indexed by BucketId. - per_bucket_docs: Vec>, - /// Only used when LOWCARD is false. - /// - /// This weird partitioning is used to do some cheap grouping on the bucket ids. - /// bucket ids are dense, e.g. when we don't detect the cardinality as low cardinality, - /// but there are just 16 bucket ids, each bucket id will go to its own partition. - /// - /// We want to keep this cheap, because high cardinality aggregations can have a lot of - /// buckets, and they may be nothing to group. - partitions: [PartitionEntry; NUM_PARTITIONS], - pub(crate) sub_agg_collector: Box, +#[derive(Debug)] +pub(crate) struct CachedSubAggs { + cache: C, + sub_agg_collector: Box, num_docs: usize, } +pub type LowCardCachedSubAggs = CachedSubAggs; +pub type HighCardCachedSubAggs = CachedSubAggs; + const FLUSH_THRESHOLD: usize = 2048; const NUM_PARTITIONS: usize = 16; -impl CachedSubAggs { +/// A trait for caching sub-aggregation doc ids per bucket id. +/// Different implementations can be used depending on the cardinality +/// of the parent aggregation. +pub trait SubAggCache: Debug { + fn new() -> Self; + fn push(&mut self, bucket_id: BucketId, doc_id: DocId); + fn flush_local( + &mut self, + sub_agg: &mut Box, + agg_data: &mut AggregationsSegmentCtx, + force: bool, + ) -> crate::Result<()>; +} + +impl CachedSubAggs { + pub fn new(sub_agg: Box) -> Self { + Self { + cache: Backend::new(), + sub_agg_collector: sub_agg, + num_docs: 0, + } + } + pub fn get_sub_agg_collector(&mut self) -> &mut Box { &mut self.sub_agg_collector } - pub fn new(sub_agg: Box) -> Self { - Self { - per_bucket_docs: Vec::new(), - num_docs: 0, - sub_agg_collector: sub_agg, - partitions: core::array::from_fn(|_| PartitionEntry::default()), - } - } - - #[inline] - pub fn clear(&mut self) { - for v in &mut self.per_bucket_docs { - v.clear(); - } - for partition in &mut self.partitions { - partition.clear(); - } - self.num_docs = 0; - } - #[inline] pub fn push(&mut self, bucket_id: BucketId, doc_id: DocId) { - if LOWCARD { - // TODO: We could flush single buckets here - let idx = bucket_id as usize; - if self.per_bucket_docs.len() <= idx { - self.per_bucket_docs.resize_with(idx + 1, Vec::new); - } - self.per_bucket_docs[idx].push(doc_id); - } else { - let idx = bucket_id % NUM_PARTITIONS as u32; - let slot = &mut self.partitions[idx as usize]; - slot.bucket_ids.push(bucket_id); - slot.docs.push(doc_id); - } + self.cache.push(bucket_id, doc_id); self.num_docs += 1; } @@ -94,82 +76,45 @@ impl CachedSubAggs { agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { if self.num_docs >= FLUSH_THRESHOLD { - self.flush_local(agg_data, false)?; + self.cache + .flush_local(&mut self.sub_agg_collector, agg_data, false)?; + self.num_docs = 0; } Ok(()) } - /// Note: this does _not_ flush the sub aggregations - fn flush_local( - &mut self, - agg_data: &mut AggregationsSegmentCtx, - force: bool, - ) -> crate::Result<()> { - if LOWCARD { - // Pre-aggregated: call collect per bucket. - let max_bucket = (self.per_bucket_docs.len() as BucketId).saturating_sub(1); - self.sub_agg_collector - .prepare_max_bucket(max_bucket, agg_data)?; - // The threshold above which we flush buckets individually. - // Note: We need to make sure that we don't lock ourselves into a situation where we hit - // the FLUSH_THRESHOLD, but never flush any buckets. (except the final flush) - let mut bucket_treshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2); - const _: () = { - // MAX_NUM_TERMS_FOR_VEC == LOWCARD threshold - let bucket_treshold = FLUSH_THRESHOLD / (MAX_NUM_TERMS_FOR_VEC as usize * 2); - assert!( - bucket_treshold > 0, - "Bucket threshold must be greater than 0" - ); - }; - if force { - bucket_treshold = 0; - } - for (bucket_id, docs) in self - .per_bucket_docs - .iter() - .enumerate() - .filter(|(_, docs)| docs.len() > bucket_treshold) - { - self.sub_agg_collector - .collect(bucket_id as BucketId, docs, agg_data)?; - } - } else { - let mut max_bucket = 0u32; - for partition in &self.partitions { - if let Some(&local_max) = partition.bucket_ids.iter().max() { - max_bucket = max_bucket.max(local_max); - } - } - - self.sub_agg_collector - .prepare_max_bucket(max_bucket, agg_data)?; - - for slot in &self.partitions { - if !slot.bucket_ids.is_empty() { - // Reduce dynamic dispatch overhead by collecting a full partition in one call. - self.sub_agg_collector.collect_multiple( - &slot.bucket_ids, - &slot.docs, - agg_data, - )?; - } - } - } - self.clear(); - Ok(()) - } - - /// Note: this _does_ flush the sub aggregations + /// Note: this _does_ flush the sub aggregations. pub fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> { if self.num_docs != 0 { - self.flush_local(agg_data, true)?; + self.cache + .flush_local(&mut self.sub_agg_collector, agg_data, true)?; + self.num_docs = 0; } self.sub_agg_collector.flush(agg_data)?; Ok(()) } } +#[derive(Debug)] +pub(crate) struct HighCardSubAggCache { + /// This weird partitioning is used to do some cheap grouping on the bucket ids. + /// bucket ids are dense, e.g. when we don't detect the cardinality as low cardinality, + /// but there are just 16 bucket ids, each bucket id will go to its own partition. + /// + /// We want to keep this cheap, because high cardinality aggregations can have a lot of + /// buckets, and there may be nothing to group. + partitions: [PartitionEntry; NUM_PARTITIONS], +} + +impl HighCardSubAggCache { + #[inline] + fn clear(&mut self) { + for partition in &mut self.partitions { + partition.clear(); + } + } +} + #[derive(Debug, Clone, Default)] struct PartitionEntry { bucket_ids: Vec, @@ -183,3 +128,114 @@ impl PartitionEntry { self.docs.clear(); } } + +impl SubAggCache for HighCardSubAggCache { + fn new() -> Self { + Self { + partitions: core::array::from_fn(|_| PartitionEntry::default()), + } + } + + fn push(&mut self, bucket_id: BucketId, doc_id: DocId) { + let idx = bucket_id % NUM_PARTITIONS as u32; + let slot = &mut self.partitions[idx as usize]; + slot.bucket_ids.push(bucket_id); + slot.docs.push(doc_id); + } + + fn flush_local( + &mut self, + sub_agg: &mut Box, + agg_data: &mut AggregationsSegmentCtx, + _force: bool, + ) -> crate::Result<()> { + let mut max_bucket = 0u32; + for partition in &self.partitions { + if let Some(&local_max) = partition.bucket_ids.iter().max() { + max_bucket = max_bucket.max(local_max); + } + } + + sub_agg.prepare_max_bucket(max_bucket, agg_data)?; + + for slot in &self.partitions { + if !slot.bucket_ids.is_empty() { + // Reduce dynamic dispatch overhead by collecting a full partition in one call. + sub_agg.collect_multiple(&slot.bucket_ids, &slot.docs, agg_data)?; + } + } + + self.clear(); + Ok(()) + } +} + +#[derive(Debug)] +pub(crate) struct LowCardSubAggCache { + /// Cache doc ids per bucket for sub-aggregations. + /// + /// The outer Vec is indexed by BucketId. + per_bucket_docs: Vec>, +} + +impl LowCardSubAggCache { + #[inline] + fn clear(&mut self) { + for v in &mut self.per_bucket_docs { + v.clear(); + } + } +} + +impl SubAggCache for LowCardSubAggCache { + fn new() -> Self { + Self { + per_bucket_docs: Vec::new(), + } + } + + fn push(&mut self, bucket_id: BucketId, doc_id: DocId) { + let idx = bucket_id as usize; + if self.per_bucket_docs.len() <= idx { + self.per_bucket_docs.resize_with(idx + 1, Vec::new); + } + self.per_bucket_docs[idx].push(doc_id); + } + + fn flush_local( + &mut self, + sub_agg: &mut Box, + agg_data: &mut AggregationsSegmentCtx, + force: bool, + ) -> crate::Result<()> { + // Pre-aggregated: call collect per bucket. + let max_bucket = (self.per_bucket_docs.len() as BucketId).saturating_sub(1); + sub_agg.prepare_max_bucket(max_bucket, agg_data)?; + // The threshold above which we flush buckets individually. + // Note: We need to make sure that we don't lock ourselves into a situation where we hit + // the FLUSH_THRESHOLD, but never flush any buckets. (except the final flush) + let mut bucket_treshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2); + const _: () = { + // MAX_NUM_TERMS_FOR_VEC == LOWCARD threshold + let bucket_treshold = FLUSH_THRESHOLD / (MAX_NUM_TERMS_FOR_VEC as usize * 2); + assert!( + bucket_treshold > 0, + "Bucket threshold must be greater than 0" + ); + }; + if force { + bucket_treshold = 0; + } + for (bucket_id, docs) in self + .per_bucket_docs + .iter() + .enumerate() + .filter(|(_, docs)| docs.len() > bucket_treshold) + { + sub_agg.collect(bucket_id as BucketId, docs, agg_data)?; + } + + self.clear(); + Ok(()) + } +} diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index 964ea00c9..59e9c677d 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -1,6 +1,6 @@ use super::agg_req::Aggregations; use super::agg_result::AggregationResults; -use super::cached_sub_aggs::CachedSubAggs; +use super::cached_sub_aggs::LowCardCachedSubAggs; use super::intermediate_agg_result::IntermediateAggregationResults; use super::AggContextParams; // group buffering strategy is chosen explicitly by callers; no need to hash-group on the fly. @@ -136,7 +136,7 @@ fn merge_fruits( /// `AggregationSegmentCollector` does the aggregation collection on a segment. pub struct AggregationSegmentCollector { aggs_with_accessor: AggregationsSegmentCtx, - agg_collector: CachedSubAggs, + agg_collector: LowCardCachedSubAggs, error: Option, } @@ -151,7 +151,8 @@ impl AggregationSegmentCollector { ) -> crate::Result { let mut agg_data = build_aggregations_data_from_req(agg, reader, segment_ordinal, context.clone())?; - let mut result = CachedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?); + let mut result = + LowCardCachedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?); result .get_sub_agg_collector() .prepare_max_bucket(0, &agg_data)?; // prepare for bucket zero