diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 32633a8f9..29f569d12 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -378,13 +378,13 @@ pub(crate) fn build_segment_term_collector( let is_top_level = terms_req_data.is_top_level; // TODO: Benchmark to validate the threshold - const MAX_NUM_TERMS_FOR_VEC: usize = 100; + const MAX_NUM_TERMS_FOR_VEC: u64 = 100; // Let's see if we can use a vec to aggregate our data // instead of a hashmap. let col_max_value = terms_req_data.accessor.max_value(); - let max_term: usize = - col_max_value.max(terms_req_data.missing_value_for_accessor.unwrap_or(0u64)) as usize; + let max_term_id: u64 = + col_max_value.max(terms_req_data.missing_value_for_accessor.unwrap_or(0u64)); let sub_agg_collector = if has_sub_aggregations { Some(build_segment_agg_collectors(req_data, &node.children)?) @@ -395,30 +395,30 @@ pub(crate) fn build_segment_term_collector( let mut bucket_id_provider = BucketIdProvider::default(); // - use a Vec instead of a hashmap for our aggregation. - if is_top_level && max_term < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations { - let term_buckets = VecTermBucketsNoAgg::new(max_term as u64 + 1, &mut bucket_id_provider); + if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations { + let term_buckets = VecTermBucketsNoAgg::new(max_term_id + 1, &mut bucket_id_provider); let collector: SegmentTermCollector<_, true> = SegmentTermCollector { buckets: vec![term_buckets], accessor_idx, sub_agg: None, bucket_id_provider, - max_term_id: max_term as u64, + max_term_id, }; Ok(Box::new(collector)) - } else if is_top_level && max_term < MAX_NUM_TERMS_FOR_VEC { - let term_buckets = VecTermBuckets::new(max_term as u64 + 1, &mut bucket_id_provider); + } else if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC { + let term_buckets = VecTermBuckets::new(max_term_id + 1, &mut bucket_id_provider); let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); let collector: SegmentTermCollector<_, true> = SegmentTermCollector { buckets: vec![term_buckets], accessor_idx, sub_agg, bucket_id_provider, - max_term_id: max_term as u64, + max_term_id, }; Ok(Box::new(collector)) - } else if max_term < 8_000_000 && is_top_level { + } else if max_term_id < 8_000_000 && is_top_level { let term_buckets: PagedTermMap = - PagedTermMap::new(max_term as u64 + 1, &mut bucket_id_provider); + PagedTermMap::new(max_term_id + 1, &mut bucket_id_provider); // Build sub-aggregation blueprint (flat pairs) let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); let collector: SegmentTermCollector = SegmentTermCollector { @@ -426,7 +426,7 @@ pub(crate) fn build_segment_term_collector( accessor_idx, sub_agg, bucket_id_provider, - max_term_id: max_term as u64, + max_term_id, }; Ok(Box::new(collector)) } else { @@ -438,7 +438,7 @@ pub(crate) fn build_segment_term_collector( accessor_idx, sub_agg, bucket_id_provider, - max_term_id: max_term as u64, + max_term_id, }; Ok(Box::new(collector)) } @@ -828,7 +828,6 @@ impl SegmentAggregationCollect .fetch_block(docs, &req_data.accessor); } - // Build iterators first to avoid overlapping borrows with self's fields. if let Some(sub_agg) = &mut self.sub_agg { let term_buckets = &mut self.buckets[parent_bucket_id as usize]; let it = req_data diff --git a/src/aggregation/cached_sub_aggs.rs b/src/aggregation/cached_sub_aggs.rs index e430bfd4c..759824e8e 100644 --- a/src/aggregation/cached_sub_aggs.rs +++ b/src/aggregation/cached_sub_aggs.rs @@ -36,7 +36,7 @@ pub(crate) struct CachedSubAggs { num_docs: usize, } -const FLUSH_THRESHOLD: usize = 1024; +const FLUSH_THRESHOLD: usize = 2048; const NUM_PARTITIONS: usize = 16; impl CachedSubAggs { @@ -67,6 +67,7 @@ impl CachedSubAggs { #[inline] pub fn push(&mut self, bucket_id: BucketId, doc_id: DocId) { if LOWCARD { + // TODO: We could flush single buckets here let idx = bucket_id as usize; if self.per_bucket_docs.len() <= idx { self.per_bucket_docs.resize_with(idx + 1, Vec::new); @@ -88,23 +89,34 @@ impl CachedSubAggs { agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { if self.num_docs >= FLUSH_THRESHOLD { - self.flush_local(agg_data)?; + self.flush_local(agg_data, false)?; } Ok(()) } /// Note: this does _not_ flush the sub aggregations - fn flush_local(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> { + fn flush_local( + &mut self, + agg_data: &mut AggregationsSegmentCtx, + force: bool, + ) -> crate::Result<()> { if LOWCARD { // Pre-aggregated: call collect per bucket. let max_bucket = (self.per_bucket_docs.len() as BucketId).saturating_sub(1); self.sub_agg_collector .prepare_max_bucket(max_bucket, agg_data)?; + // The threshold above which we flush buckets individually. + // Note: We need to make sure that we don't lock ourselves into a situation where we hit + // the FLUSH_THRESHOLD, but never flush any buckets. + let mut bucket_treshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2); + if force { + bucket_treshold = 0; + } for (bucket_id, docs) in self .per_bucket_docs .iter() .enumerate() - .filter(|(_, docs)| !docs.is_empty()) + .filter(|(_, docs)| docs.len() > bucket_treshold) { self.sub_agg_collector .collect(bucket_id as BucketId, docs, agg_data)?; @@ -138,25 +150,13 @@ impl CachedSubAggs { /// Note: this _does_ flush the sub aggregations pub fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> { if self.num_docs != 0 { - self.flush_local(agg_data)?; + self.flush_local(agg_data, true)?; } self.sub_agg_collector.flush(agg_data)?; Ok(()) } } -impl CachedSubAggs { - /// Implemented Only for low cardinality cached sub-aggregations. - #[inline] - pub fn extend_with_bucket_zero(&mut self, docs: &[DocId]) { - if self.per_bucket_docs.is_empty() { - self.per_bucket_docs.resize_with(1, Vec::new); - } - self.per_bucket_docs[0].extend_from_slice(docs); - self.num_docs += docs.len(); - } -} - #[derive(Debug, Clone)] struct PartitionEntry { bucket_ids: Vec, diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index e6993eae6..964ea00c9 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -151,7 +151,10 @@ impl AggregationSegmentCollector { ) -> crate::Result { let mut agg_data = build_aggregations_data_from_req(agg, reader, segment_ordinal, context.clone())?; - let result = CachedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?); + let mut result = CachedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?); + result + .get_sub_agg_collector() + .prepare_max_bucket(0, &agg_data)?; // prepare for bucket zero Ok(AggregationSegmentCollector { aggs_with_accessor: agg_data, @@ -184,11 +187,12 @@ impl SegmentCollector for AggregationSegmentCollector { if self.error.is_some() { return; } - self.agg_collector.extend_with_bucket_zero(docs); - match self - .agg_collector - .check_flush_local(&mut self.aggs_with_accessor) - { + + match self.agg_collector.get_sub_agg_collector().collect( + 0, + docs, + &mut self.aggs_with_accessor, + ) { Ok(_) => {} Err(e) => { self.error = Some(e); diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index 98c33c96c..4eab921e6 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -277,26 +277,17 @@ impl SegmentAggregationCollector docs: &[crate::DocId], _agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { - // Copying the stats to avoid aliasing optimization issues - let mut stats = self.buckets[parent_bucket_id as usize]; if let Some(missing) = self.missing_u64.as_ref() { self.column_block_accessor .fetch_block_with_missing(docs, &self.accessor, *missing); } else { self.column_block_accessor.fetch_block(docs, &self.accessor); } - if self.is_number_or_date_type { - for val in self.column_block_accessor.iter_vals() { - let val1 = convert_to_f64::(val); - stats.collect(val1); - } - } else { - for _val in self.column_block_accessor.iter_vals() { - // we ignore the value and simply record that we got something - stats.collect(0.0); - } - } - self.buckets[parent_bucket_id as usize] = stats; + collect_stats::( + &mut self.buckets[parent_bucket_id as usize], + &mut self.column_block_accessor, + self.is_number_or_date_type, + )?; Ok(()) } @@ -315,6 +306,27 @@ impl SegmentAggregationCollector } } +#[inline] +fn collect_stats( + stats: &mut IntermediateStats, + column_block_accessor: &mut ColumnBlockAccessor, + is_number_or_date_type: bool, +) -> crate::Result<()> { + if is_number_or_date_type { + for val in column_block_accessor.iter_vals() { + let val1 = convert_to_f64::(val); + stats.collect(val1); + } + } else { + for _val in column_block_accessor.iter_vals() { + // we ignore the value and simply record that we got something + stats.collect(0.0); + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use serde_json::Value; diff --git a/src/core/executor.rs b/src/core/executor.rs index 8cc7e0026..f11644599 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -48,7 +48,15 @@ impl Executor { F: Sized + Sync + Fn(A) -> crate::Result, { match self { - Executor::SingleThread => args.map(f).collect::>(), + Executor::SingleThread => { + // Avoid `collect`, since the stacktrace is blown up by it, which makes profiling + // harder. + let mut result = Vec::with_capacity(args.size_hint().0); + for arg in args { + result.push(f(arg)?); + } + Ok(result) + } Executor::ThreadPool(pool) => { let args: Vec = args.collect(); let num_fruits = args.len();