From 019db10e8ef871c1cf5d6e73ee2e5d5ac3f0f321 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 16 Feb 2023 20:15:16 +0800 Subject: [PATCH] refactor aggregations (#1875) * add specialized version for full cardinality Pre Columnar test aggregation::tests::bench::bench_aggregation_average_u64 ... bench: 6,681,850 ns/iter (+/- 1,217,385) test aggregation::tests::bench::bench_aggregation_average_u64_and_f64 ... bench: 10,576,327 ns/iter (+/- 494,380) Current test aggregation::tests::bench::bench_aggregation_average_u64 ... bench: 11,562,084 ns/iter (+/- 3,678,682) test aggregation::tests::bench::bench_aggregation_average_u64_and_f64 ... bench: 18,925,790 ns/iter (+/- 17,616,771) Post Change test aggregation::tests::bench::bench_aggregation_average_u64 ... bench: 9,123,811 ns/iter (+/- 399,720) test aggregation::tests::bench::bench_aggregation_average_u64_and_f64 ... bench: 13,111,825 ns/iter (+/- 273,547) * refactor aggregation collection * add buffering collector --- src/aggregation/bucket/histogram/histogram.rs | 21 +- src/aggregation/bucket/range.rs | 39 ++- src/aggregation/bucket/term_agg.rs | 18 +- src/aggregation/buf_collector.rs | 81 +++++++ src/aggregation/collector.rs | 5 +- src/aggregation/metric/stats.rs | 81 ++++++- src/aggregation/mod.rs | 5 +- src/aggregation/segment_agg_result.rs | 222 +++++++++++------- 8 files changed, 329 insertions(+), 143 deletions(-) create mode 100644 src/aggregation/buf_collector.rs diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 5be17928d..83002177a 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -338,7 +338,6 @@ impl SegmentHistogramCollector { &mut self, docs: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, - force_flush: bool, ) -> crate::Result<()> { let bounds = self.bounds; let interval = self.interval; @@ -362,14 +361,6 @@ impl SegmentHistogramCollector { )?; } } - if force_flush { - if let Some(sub_aggregations) = self.sub_aggregations.as_mut() { - for sub_aggregation in sub_aggregations { - sub_aggregation - .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?; - } - } - } Ok(()) } @@ -408,6 +399,18 @@ impl SegmentHistogramCollector { Ok(()) } + pub(crate) fn flush( + &mut self, + bucket_with_accessor: &BucketAggregationWithAccessor, + ) -> crate::Result<()> { + if let Some(sub_aggregations) = self.sub_aggregations.as_mut() { + for sub_aggregation in sub_aggregations { + sub_aggregation.flush(&bucket_with_accessor.sub_aggregation)?; + } + } + Ok(()) + } + fn f64_from_fastfield_u64(&self, val: u64) -> f64 { f64_from_fastfield_u64(val, &self.field_type) } diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 51eb06330..633ad767b 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -263,40 +263,21 @@ impl SegmentRangeCollector { &mut self, docs: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, - force_flush: bool, ) -> crate::Result<()> { let accessor = &bucket_with_accessor.accessor; for doc in docs { for val in accessor.values(*doc) { let bucket_pos = self.get_bucket_pos(val); - self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?; - } - } - if force_flush { - for bucket in &mut self.buckets { + let bucket = &mut self.buckets[bucket_pos]; + + bucket.bucket.doc_count += 1; if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { - sub_aggregation - .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?; + sub_aggregation.collect(*doc, &bucket_with_accessor.sub_aggregation)?; } } } - Ok(()) - } - #[inline] - fn increment_bucket( - &mut self, - bucket_pos: usize, - doc: DocId, - bucket_with_accessor: &AggregationsWithAccessor, - ) -> crate::Result<()> { - let bucket = &mut self.buckets[bucket_pos]; - - bucket.bucket.doc_count += 1; - if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { - sub_aggregation.collect(doc, bucket_with_accessor)?; - } Ok(()) } @@ -309,6 +290,18 @@ impl SegmentRangeCollector { debug_assert!(self.buckets[pos].range.contains(&val)); pos } + + pub(crate) fn flush( + &mut self, + bucket_with_accessor: &BucketAggregationWithAccessor, + ) -> crate::Result<()> { + for bucket in &mut self.buckets { + if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { + sub_aggregation.flush(&bucket_with_accessor.sub_aggregation)?; + } + } + Ok(()) + } } /// Converts the user provided f64 range value to fast field value space. diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 5b87646a1..f00d075b3 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -244,7 +244,7 @@ impl TermBuckets { fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { for entry in &mut self.entries.values_mut() { if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { - sub_aggregations.flush_staged_docs(agg_with_accessor, false)?; + sub_aggregations.flush(agg_with_accessor)?; } } Ok(()) @@ -289,7 +289,7 @@ impl SegmentTermCollector { let has_sub_aggregations = !sub_aggregations.is_empty(); let blueprint = if has_sub_aggregations { - let sub_aggregation = build_segment_agg_collector(sub_aggregations)?; + let sub_aggregation = build_segment_agg_collector(sub_aggregations, false)?; Some(sub_aggregation) } else { None @@ -393,7 +393,6 @@ impl SegmentTermCollector { &mut self, docs: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, - force_flush: bool, ) -> crate::Result<()> { let accessor = &bucket_with_accessor.accessor; @@ -411,10 +410,15 @@ impl SegmentTermCollector { } } - if force_flush { - self.term_buckets - .force_flush(&bucket_with_accessor.sub_aggregation)?; - } + Ok(()) + } + + pub(crate) fn flush( + &mut self, + bucket_with_accessor: &BucketAggregationWithAccessor, + ) -> crate::Result<()> { + self.term_buckets + .force_flush(&bucket_with_accessor.sub_aggregation)?; Ok(()) } } diff --git a/src/aggregation/buf_collector.rs b/src/aggregation/buf_collector.rs new file mode 100644 index 000000000..149707f82 --- /dev/null +++ b/src/aggregation/buf_collector.rs @@ -0,0 +1,81 @@ +use super::agg_req_with_accessor::AggregationsWithAccessor; +use super::intermediate_agg_result::IntermediateAggregationResults; +use super::segment_agg_result::SegmentAggregationCollector; +use crate::DocId; + +pub(crate) const DOC_BLOCK_SIZE: usize = 64; +pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE]; + +/// BufAggregationCollector buffers documents before calling collect_block(). +#[derive(Clone)] +pub(crate) struct BufAggregationCollector { + pub(crate) collector: T, + staged_docs: DocBlock, + num_staged_docs: usize, +} + +impl std::fmt::Debug for BufAggregationCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SegmentAggregationResultsCollector") + .field("staged_docs", &&self.staged_docs[..self.num_staged_docs]) + .field("num_staged_docs", &self.num_staged_docs) + .finish() + } +} + +impl BufAggregationCollector { + pub fn new(collector: T) -> Self { + Self { + collector, + num_staged_docs: 0, + staged_docs: [0; DOC_BLOCK_SIZE], + } + } +} + +impl SegmentAggregationCollector + for BufAggregationCollector +{ + fn into_intermediate_aggregations_result( + self: Box, + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result { + Box::new(self.collector).into_intermediate_aggregations_result(agg_with_accessor) + } + + fn collect( + &mut self, + doc: crate::DocId, + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result<()> { + self.staged_docs[self.num_staged_docs] = doc; + self.num_staged_docs += 1; + if self.num_staged_docs == self.staged_docs.len() { + self.collector + .collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor)?; + self.num_staged_docs = 0; + } + Ok(()) + } + + fn collect_block( + &mut self, + docs: &[crate::DocId], + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result<()> { + for doc in docs { + self.collect(*doc, agg_with_accessor)?; + } + Ok(()) + } + + fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + self.collector + .collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor)?; + self.num_staged_docs = 0; + + self.collector.flush(agg_with_accessor)?; + + Ok(()) + } +} diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index e07cb7f39..b8887f2c8 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -151,7 +151,7 @@ impl AggregationSegmentCollector { ) -> crate::Result { let aggs_with_accessor = get_aggs_with_accessor_and_validate(agg, reader, Rc::default(), max_bucket_count)?; - let result = build_segment_agg_collector(&aggs_with_accessor)?; + let result = build_segment_agg_collector(&aggs_with_accessor, true)?; Ok(AggregationSegmentCollector { aggs_with_accessor, result, @@ -177,8 +177,7 @@ impl SegmentCollector for AggregationSegmentCollector { if let Some(err) = self.error { return Err(err); } - self.result - .flush_staged_docs(&self.aggs_with_accessor, true)?; + self.result.flush(&self.aggs_with_accessor)?; self.result .into_intermediate_aggregations_result(&self.aggs_with_accessor) } diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index d03e509e9..f8ecb887e 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -1,4 +1,4 @@ -use columnar::Column; +use columnar::{Cardinality, Column}; use serde::{Deserialize, Serialize}; use super::*; @@ -156,23 +156,36 @@ pub(crate) struct SegmentStatsCollector { field_type: Type, pub(crate) collecting_for: SegmentStatsType, pub(crate) stats: IntermediateStats, + pub(crate) accessor_idx: usize, } impl SegmentStatsCollector { - pub fn from_req(field_type: Type, collecting_for: SegmentStatsType) -> Self { + pub fn from_req( + field_type: Type, + collecting_for: SegmentStatsType, + accessor_idx: usize, + ) -> Self { Self { field_type, collecting_for, stats: IntermediateStats::default(), + accessor_idx, } } - pub(crate) fn collect_block(&mut self, docs: &[DocId], field: &Column) { - // TODO special case for Required, Optional column type - for doc in docs { - for val in field.values(*doc) { + pub(crate) fn collect_block_with_field(&mut self, docs: &[DocId], field: &Column) { + if field.get_cardinality() == Cardinality::Full { + for doc in docs { + let val = field.values.get_val(*doc); let val1 = f64_from_fastfield_u64(val, &self.field_type); self.stats.collect(val1); } + } else { + for doc in docs { + for val in field.values(*doc) { + let val1 = f64_from_fastfield_u64(val, &self.field_type); + self.stats.collect(val1); + } + } } } } @@ -219,20 +232,29 @@ impl SegmentAggregationCollector for SegmentStatsCollector { doc: crate::DocId, agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result<()> { - let accessor = &agg_with_accessor.metrics.values[0].accessor; - for val in accessor.values(doc) { + let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor; + + if field.get_cardinality() == Cardinality::Full { + let val = field.values.get_val(doc); let val1 = f64_from_fastfield_u64(val, &self.field_type); self.stats.collect(val1); + } else { + for val in field.values(doc) { + let val1 = f64_from_fastfield_u64(val, &self.field_type); + self.stats.collect(val1); + } } Ok(()) } - fn flush_staged_docs( + fn collect_block( &mut self, - _agg_with_accessor: &AggregationsWithAccessor, - _force_flush: bool, + docs: &[crate::DocId], + agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result<()> { + let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor; + self.collect_block_with_field(docs, field); Ok(()) } } @@ -293,6 +315,43 @@ mod tests { Ok(()) } + #[test] + fn test_aggregation_stats_simple() -> crate::Result<()> { + // test index without segments + let values = vec![10.0]; + + let index = get_test_index_from_values(false, &values)?; + + let agg_req_1: Aggregations = vec![( + "stats".to_string(), + Aggregation::Metric(MetricAggregation::Stats(StatsAggregation::from_field_name( + "score".to_string(), + ))), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); + + let reader = index.reader()?; + let searcher = reader.searcher(); + let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + assert_eq!( + res["stats"], + json!({ + "avg": 10.0, + "count": 1, + "max": 10.0, + "min": 10.0, + "sum": 10.0 + }) + ); + + Ok(()) + } + #[test] fn test_aggregation_stats() -> crate::Result<()> { let index = get_test_index_2_segments(false)?; diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 547bb3ade..191b4fdf5 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -160,6 +160,7 @@ pub mod agg_req; mod agg_req_with_accessor; pub mod agg_result; pub mod bucket; +mod buf_collector; mod collector; mod date; pub mod intermediate_agg_result; @@ -332,8 +333,8 @@ mod tests { }; use crate::aggregation::agg_result::AggregationResults; use crate::aggregation::bucket::TermsAggregation; + use crate::aggregation::buf_collector::DOC_BLOCK_SIZE; use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults; - use crate::aggregation::segment_agg_result::DOC_BLOCK_SIZE; use crate::aggregation::DistributedAggregationCollector; use crate::indexer::NoMergePolicy; use crate::query::{AllQuery, TermQuery}; @@ -1590,7 +1591,7 @@ mod tests { } #[bench] - fn bench_aggregation_sub_tree(b: &mut Bencher) { + fn bench_aggregation_avg_and_range_with_avg(b: &mut Bencher) { let index = get_test_index_bench(false).unwrap(); let reader = index.reader().unwrap(); let text_field = reader.searcher().schema().get_field("text").unwrap(); diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 7f8ece73f..c01e673b0 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -12,6 +12,7 @@ use super::agg_req_with_accessor::{ AggregationsWithAccessor, BucketAggregationWithAccessor, MetricAggregationWithAccessor, }; use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTermCollector}; +use super::buf_collector::BufAggregationCollector; use super::collector::MAX_BUCKET_COUNT; use super::intermediate_agg_result::{IntermediateAggregationResults, IntermediateBucketResult}; use super::metric::{ @@ -22,9 +23,6 @@ use super::VecWithNames; use crate::aggregation::agg_req::BucketAggregationType; use crate::{DocId, TantivyError}; -pub(crate) const DOC_BLOCK_SIZE: usize = 64; -pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE]; - pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug { fn into_intermediate_aggregations_result( self: Box, @@ -37,11 +35,17 @@ pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug { agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result<()>; - fn flush_staged_docs( + fn collect_block( &mut self, + docs: &[crate::DocId], agg_with_accessor: &AggregationsWithAccessor, - force_flush: bool, ) -> crate::Result<()>; + + /// Finalize method. Some Aggregator collect blocks of docs before calling `collect_block`. + /// This method ensures those staged docs will be collected. + fn flush(&mut self, _agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + Ok(()) + } } pub(crate) trait CollectorClone { @@ -64,36 +68,56 @@ impl Clone for Box { pub(crate) fn build_segment_agg_collector( req: &AggregationsWithAccessor, + add_buffer_layer: bool, ) -> crate::Result> { // Single metric special case if req.buckets.is_empty() && req.metrics.len() == 1 { let req = &req.metrics.values[0]; + let accessor_idx = 0; let stats_collector = match &req.metric { MetricAggregation::Average(AverageAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Average) - } - MetricAggregation::Count(CountAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count) + SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Average, + accessor_idx, + ) } + MetricAggregation::Count(CountAggregation { .. }) => SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Count, + accessor_idx, + ), MetricAggregation::Max(MaxAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max) + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx) } MetricAggregation::Min(MinAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min) - } - MetricAggregation::Stats(StatsAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats) + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx) } + MetricAggregation::Stats(StatsAggregation { .. }) => SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Stats, + accessor_idx, + ), MetricAggregation::Sum(SumAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum) + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx) } }; - return Ok(Box::new(stats_collector)); + if add_buffer_layer { + let stats_collector = BufAggregationCollector::new(stats_collector); + return Ok(Box::new(stats_collector)); + } else { + return Ok(Box::new(stats_collector)); + } } let agg = GenericSegmentAggregationResultsCollector::from_req_and_validate(req)?; - Ok(Box::new(agg)) + if add_buffer_layer { + let agg = BufAggregationCollector::new(agg); + Ok(Box::new(agg)) + } else { + Ok(Box::new(agg)) + } } #[derive(Clone)] @@ -103,8 +127,6 @@ pub(crate) fn build_segment_agg_collector( pub(crate) struct GenericSegmentAggregationResultsCollector { pub(crate) metrics: Option>, pub(crate) buckets: Option>, - staged_docs: DocBlock, - num_staged_docs: usize, } impl Default for GenericSegmentAggregationResultsCollector { @@ -112,8 +134,6 @@ impl Default for GenericSegmentAggregationResultsCollector { Self { metrics: Default::default(), buckets: Default::default(), - staged_docs: [0; DOC_BLOCK_SIZE], - num_staged_docs: Default::default(), } } } @@ -123,8 +143,6 @@ impl Debug for GenericSegmentAggregationResultsCollector { f.debug_struct("SegmentAggregationResultsCollector") .field("metrics", &self.metrics) .field("buckets", &self.buckets) - .field("staged_docs", &&self.staged_docs[..self.num_staged_docs]) - .field("num_staged_docs", &self.num_staged_docs) .finish() } } @@ -154,44 +172,43 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector { doc: crate::DocId, agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result<()> { - self.staged_docs[self.num_staged_docs] = doc; - self.num_staged_docs += 1; - if self.num_staged_docs == self.staged_docs.len() { - self.flush_staged_docs(agg_with_accessor, false)?; - } + self.collect_block(&[doc], agg_with_accessor)?; + Ok(()) } - fn flush_staged_docs( + fn collect_block( &mut self, + docs: &[crate::DocId], agg_with_accessor: &AggregationsWithAccessor, - force_flush: bool, ) -> crate::Result<()> { - if self.num_staged_docs == 0 { - return Ok(()); - } - if let Some(metrics) = &mut self.metrics { + if let Some(metrics) = self.metrics.as_mut() { for (collector, agg_with_accessor) in metrics.values_mut().zip(agg_with_accessor.metrics.values()) { - collector - .collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor); + collector.collect_block(&docs, agg_with_accessor); } } + if let Some(buckets) = self.buckets.as_mut() { + for (collector, agg_with_accessor) in + buckets.values_mut().zip(agg_with_accessor.buckets.values()) + { + collector.collect_block(&docs, agg_with_accessor)?; + } + } + + Ok(()) + } + + fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { if let Some(buckets) = &mut self.buckets { for (collector, agg_with_accessor) in buckets.values_mut().zip(agg_with_accessor.buckets.values()) { - collector.collect_block( - &self.staged_docs[..self.num_staged_docs], - agg_with_accessor, - force_flush, - )?; + collector.flush(agg_with_accessor)?; } } - - self.num_staged_docs = 0; Ok(()) } } @@ -230,10 +247,11 @@ impl GenericSegmentAggregationResultsCollector { let metrics = req .metrics .iter() - .map(|(key, req)| { + .enumerate() + .map(|(accesor_idx, (key, req))| { Ok(( key.to_string(), - SegmentMetricResultCollector::from_req_and_validate(req)?, + SegmentMetricResultCollector::from_req_and_validate(req, accesor_idx)?, )) }) .collect::>>()?; @@ -247,12 +265,7 @@ impl GenericSegmentAggregationResultsCollector { } else { Some(VecWithNames::from_entries(buckets)) }; - Ok(GenericSegmentAggregationResultsCollector { - metrics, - buckets, - staged_docs: [0; DOC_BLOCK_SIZE], - num_staged_docs: 0, - }) + Ok(GenericSegmentAggregationResultsCollector { metrics, buckets }) } } @@ -262,44 +275,59 @@ pub(crate) enum SegmentMetricResultCollector { } impl SegmentMetricResultCollector { - pub fn from_req_and_validate(req: &MetricAggregationWithAccessor) -> crate::Result { + pub fn from_req_and_validate( + req: &MetricAggregationWithAccessor, + accessor_idx: usize, + ) -> crate::Result { match &req.metric { - MetricAggregation::Average(AverageAggregation { .. }) => { - Ok(SegmentMetricResultCollector::Stats( - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Average), - )) - } - MetricAggregation::Count(CountAggregation { .. }) => { - Ok(SegmentMetricResultCollector::Stats( - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count), - )) - } - MetricAggregation::Max(MaxAggregation { .. }) => { - Ok(SegmentMetricResultCollector::Stats( - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max), - )) - } - MetricAggregation::Min(MinAggregation { .. }) => { - Ok(SegmentMetricResultCollector::Stats( - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min), - )) - } - MetricAggregation::Stats(StatsAggregation { .. }) => { - Ok(SegmentMetricResultCollector::Stats( - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats), - )) - } - MetricAggregation::Sum(SumAggregation { .. }) => { - Ok(SegmentMetricResultCollector::Stats( - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum), - )) - } + MetricAggregation::Average(AverageAggregation { .. }) => Ok( + SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Average, + accessor_idx, + )), + ), + MetricAggregation::Count(CountAggregation { .. }) => Ok( + SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Count, + accessor_idx, + )), + ), + MetricAggregation::Max(MaxAggregation { .. }) => Ok( + SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Max, + accessor_idx, + )), + ), + MetricAggregation::Min(MinAggregation { .. }) => Ok( + SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Min, + accessor_idx, + )), + ), + MetricAggregation::Stats(StatsAggregation { .. }) => Ok( + SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Stats, + accessor_idx, + )), + ), + MetricAggregation::Sum(SumAggregation { .. }) => Ok( + SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Sum, + accessor_idx, + )), + ), } } pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) { match self { SegmentMetricResultCollector::Stats(stats_collector) => { - stats_collector.collect_block(doc, &metric.accessor); + stats_collector.collect_block_with_field(doc, &metric.accessor); } } } @@ -361,19 +389,37 @@ impl SegmentBucketResultCollector { #[inline] pub(crate) fn collect_block( &mut self, - doc: &[DocId], + docs: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, - force_flush: bool, ) -> crate::Result<()> { match self { SegmentBucketResultCollector::Range(range) => { - range.collect_block(doc, bucket_with_accessor, force_flush)?; + range.collect_block(docs, bucket_with_accessor)?; } SegmentBucketResultCollector::Histogram(histogram) => { - histogram.collect_block(doc, bucket_with_accessor, force_flush)?; + histogram.collect_block(docs, bucket_with_accessor)?; } SegmentBucketResultCollector::Terms(terms) => { - terms.collect_block(doc, bucket_with_accessor, force_flush)?; + terms.collect_block(docs, bucket_with_accessor)?; + } + } + Ok(()) + } + + #[inline] + pub(crate) fn flush( + &mut self, + bucket_with_accessor: &BucketAggregationWithAccessor, + ) -> crate::Result<()> { + match self { + SegmentBucketResultCollector::Range(range) => { + range.flush(bucket_with_accessor)?; + } + SegmentBucketResultCollector::Histogram(histogram) => { + histogram.flush(bucket_with_accessor)?; + } + SegmentBucketResultCollector::Terms(terms) => { + terms.flush(bucket_with_accessor)?; } } Ok(())