diff --git a/columnar/src/column/mod.rs b/columnar/src/column/mod.rs index 5ff03b1f6..dcca5d3ef 100644 --- a/columnar/src/column/mod.rs +++ b/columnar/src/column/mod.rs @@ -38,6 +38,7 @@ impl Column { } impl Column { + #[inline] pub fn get_cardinality(&self) -> Cardinality { self.idx.get_cardinality() } diff --git a/columnar/src/column_index/mod.rs b/columnar/src/column_index/mod.rs index 2ea260442..ea12806c5 100644 --- a/columnar/src/column_index/mod.rs +++ b/columnar/src/column_index/mod.rs @@ -34,6 +34,7 @@ impl From for ColumnIndex { } impl ColumnIndex { + #[inline] pub fn get_cardinality(&self) -> Cardinality { match self { ColumnIndex::Full => Cardinality::Full, diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 83002177a..b052818d9 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -14,9 +14,9 @@ use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, }; use crate::aggregation::segment_agg_result::{ - GenericSegmentAggregationResultsCollector, SegmentAggregationCollector, + build_segment_agg_collector, SegmentAggregationCollector, }; -use crate::aggregation::{f64_from_fastfield_u64, format_date}; +use crate::aggregation::{f64_from_fastfield_u64, format_date, VecWithNames}; use crate::schema::{Schema, Type}; use crate::{DocId, TantivyError}; @@ -185,7 +185,7 @@ pub(crate) struct SegmentHistogramBucketEntry { impl SegmentHistogramBucketEntry { pub(crate) fn into_intermediate_bucket_entry( self, - sub_aggregation: GenericSegmentAggregationResultsCollector, + sub_aggregation: Box, agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result { Ok(IntermediateHistogramBucketEntry { @@ -203,13 +203,86 @@ impl SegmentHistogramBucketEntry { pub struct SegmentHistogramCollector { /// The buckets containing the aggregation data. buckets: Vec, - sub_aggregations: Option>, + sub_aggregations: Option>>, field_type: Type, interval: f64, offset: f64, min_doc_count: u64, first_bucket_num: i64, bounds: HistogramBounds, + accessor_idx: usize, +} + +impl SegmentAggregationCollector for SegmentHistogramCollector { + fn into_intermediate_aggregations_result( + self: Box, + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result { + let name = agg_with_accessor.buckets.keys[self.accessor_idx].to_string(); + let agg_with_accessor = &agg_with_accessor.buckets.values[self.accessor_idx]; + + let bucket = self.into_intermediate_bucket_result(agg_with_accessor)?; + let buckets = Some(VecWithNames::from_entries(vec![(name, bucket)])); + + Ok(IntermediateAggregationResults { + metrics: None, + buckets, + }) + } + + fn collect( + &mut self, + doc: crate::DocId, + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result<()> { + self.collect_block(&[doc], agg_with_accessor) + } + + fn collect_block( + &mut self, + docs: &[crate::DocId], + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result<()> { + let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor; + let sub_aggregation_accessor = + &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + + let bounds = self.bounds; + let interval = self.interval; + let offset = self.offset; + let first_bucket_num = self.first_bucket_num; + let get_bucket_num = + |val| (get_bucket_num_f64(val, interval, offset) as i64 - first_bucket_num) as usize; + + for doc in docs { + for val in accessor.values(*doc) { + let val = self.f64_from_fastfield_u64(val); + + let bucket_pos = get_bucket_num(val); + self.increment_bucket_if_in_bounds( + val, + &bounds, + bucket_pos, + *doc, + sub_aggregation_accessor, + )?; + } + } + Ok(()) + } + + fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + let sub_aggregation_accessor = + &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + + if let Some(sub_aggregations) = self.sub_aggregations.as_mut() { + for sub_aggregation in sub_aggregations { + sub_aggregation.flush(sub_aggregation_accessor)?; + } + } + + Ok(()) + } } impl SegmentHistogramCollector { @@ -285,6 +358,7 @@ impl SegmentHistogramCollector { sub_aggregation: &AggregationsWithAccessor, field_type: Type, accessor: &Column, + accessor_idx: usize, ) -> crate::Result { req.validate()?; let min = f64_from_fastfield_u64(accessor.min_value(), &field_type); @@ -300,8 +374,7 @@ impl SegmentHistogramCollector { let sub_aggregations = if sub_aggregation.is_empty() { None } else { - let sub_aggregation = - GenericSegmentAggregationResultsCollector::from_req_and_validate(sub_aggregation)?; + let sub_aggregation = build_segment_agg_collector(sub_aggregation, false)?; Some(buckets.iter().map(|_| sub_aggregation.clone()).collect()) }; @@ -330,40 +403,10 @@ impl SegmentHistogramCollector { bounds, sub_aggregations, min_doc_count: req.min_doc_count(), + accessor_idx, }) } - #[inline] - pub(crate) fn collect_block( - &mut self, - docs: &[DocId], - bucket_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result<()> { - let bounds = self.bounds; - let interval = self.interval; - let offset = self.offset; - let first_bucket_num = self.first_bucket_num; - let get_bucket_num = - |val| (get_bucket_num_f64(val, interval, offset) as i64 - first_bucket_num) as usize; - - let accessor = &bucket_with_accessor.accessor; - for doc in docs { - for val in accessor.values(*doc) { - let val = self.f64_from_fastfield_u64(val); - - let bucket_pos = get_bucket_num(val); - self.increment_bucket_if_in_bounds( - val, - &bounds, - bucket_pos, - *doc, - &bucket_with_accessor.sub_aggregation, - )?; - } - } - Ok(()) - } - #[inline] fn increment_bucket_if_in_bounds( &mut self, @@ -399,18 +442,6 @@ impl SegmentHistogramCollector { Ok(()) } - pub(crate) fn flush( - &mut self, - bucket_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result<()> { - if let Some(sub_aggregations) = self.sub_aggregations.as_mut() { - for sub_aggregation in sub_aggregations { - sub_aggregation.flush(&bucket_with_accessor.sub_aggregation)?; - } - } - Ok(()) - } - fn f64_from_fastfield_u64(&self, val: u64) -> f64 { f64_from_fastfield_u64(val, &self.field_type) } diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 633ad767b..b61e6062f 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -5,20 +5,19 @@ use columnar::MonotonicallyMappableToU64; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; -use crate::aggregation::agg_req_with_accessor::{ - AggregationsWithAccessor, BucketAggregationWithAccessor, -}; +use crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor; use crate::aggregation::intermediate_agg_result::{ - IntermediateBucketResult, IntermediateRangeBucketEntry, IntermediateRangeBucketResult, + IntermediateAggregationResults, IntermediateBucketResult, IntermediateRangeBucketEntry, + IntermediateRangeBucketResult, }; use crate::aggregation::segment_agg_result::{ - BucketCount, GenericSegmentAggregationResultsCollector, SegmentAggregationCollector, + build_segment_agg_collector, BucketCount, SegmentAggregationCollector, }; use crate::aggregation::{ - f64_from_fastfield_u64, f64_to_fastfield_u64, format_date, Key, SerializedKey, + f64_from_fastfield_u64, f64_to_fastfield_u64, format_date, Key, SerializedKey, VecWithNames, }; use crate::schema::Type; -use crate::{DocId, TantivyError}; +use crate::TantivyError; /// Provide user-defined buckets to aggregate on. /// Two special buckets will automatically be created to cover the whole range of values. @@ -129,13 +128,14 @@ pub struct SegmentRangeCollector { /// The buckets containing the aggregation data. buckets: Vec, field_type: Type, + pub(crate) accessor_idx: usize, } #[derive(Clone)] pub(crate) struct SegmentRangeBucketEntry { pub key: Key, pub doc_count: u64, - pub sub_aggregation: Option, + pub sub_aggregation: Option>, /// The from range of the bucket. Equals `f64::MIN` when `None`. pub from: Option, /// The to range of the bucket. Equals `f64::MAX` when `None`. Open interval, `to` is not @@ -174,12 +174,14 @@ impl SegmentRangeBucketEntry { } } -impl SegmentRangeCollector { - pub fn into_intermediate_bucket_result( - self, - agg_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result { +impl SegmentAggregationCollector for SegmentRangeCollector { + fn into_intermediate_aggregations_result( + self: Box, + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result { let field_type = self.field_type; + let name = agg_with_accessor.buckets.keys[self.accessor_idx].to_string(); + let sub_agg = &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; let buckets: FxHashMap = self .buckets @@ -189,21 +191,74 @@ impl SegmentRangeCollector { range_to_string(&range_bucket.range, &field_type)?, range_bucket .bucket - .into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation)?, + .into_intermediate_bucket_entry(sub_agg)?, )) }) .collect::>()?; - Ok(IntermediateBucketResult::Range( - IntermediateRangeBucketResult { buckets }, - )) + let bucket = IntermediateBucketResult::Range(IntermediateRangeBucketResult { buckets }); + + let buckets = Some(VecWithNames::from_entries(vec![(name, bucket)])); + + Ok(IntermediateAggregationResults { + metrics: None, + buckets, + }) } + fn collect( + &mut self, + doc: crate::DocId, + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result<()> { + self.collect_block(&[doc], agg_with_accessor) + } + + fn collect_block( + &mut self, + docs: &[crate::DocId], + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result<()> { + let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor; + let sub_aggregation_accessor = + &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + for doc in docs { + for val in accessor.values(*doc) { + let bucket_pos = self.get_bucket_pos(val); + + let bucket = &mut self.buckets[bucket_pos]; + + bucket.bucket.doc_count += 1; + if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { + sub_aggregation.collect(*doc, sub_aggregation_accessor)?; + } + } + } + + Ok(()) + } + + fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + let sub_aggregation_accessor = + &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + + for bucket in self.buckets.iter_mut() { + if let Some(sub_agg) = bucket.bucket.sub_aggregation.as_mut() { + sub_agg.flush(sub_aggregation_accessor)?; + } + } + + Ok(()) + } +} + +impl SegmentRangeCollector { pub(crate) fn from_req_and_validate( req: &RangeAggregation, sub_aggregation: &AggregationsWithAccessor, bucket_count: &BucketCount, field_type: Type, + accessor_idx: usize, ) -> crate::Result { // The range input on the request is f64. // We need to convert to u64 ranges, because we read the values as u64. @@ -229,11 +284,7 @@ impl SegmentRangeCollector { let sub_aggregation = if sub_aggregation.is_empty() { None } else { - Some( - GenericSegmentAggregationResultsCollector::from_req_and_validate( - sub_aggregation, - )?, - ) + Some(build_segment_agg_collector(sub_aggregation, false)?) }; Ok(SegmentRangeAndBucketEntry { @@ -255,32 +306,10 @@ impl SegmentRangeCollector { Ok(SegmentRangeCollector { buckets, field_type, + accessor_idx, }) } - #[inline] - pub(crate) fn collect_block( - &mut self, - docs: &[DocId], - bucket_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result<()> { - let accessor = &bucket_with_accessor.accessor; - for doc in docs { - for val in accessor.values(*doc) { - let bucket_pos = self.get_bucket_pos(val); - - let bucket = &mut self.buckets[bucket_pos]; - - bucket.bucket.doc_count += 1; - if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { - sub_aggregation.collect(*doc, &bucket_with_accessor.sub_aggregation)?; - } - } - } - - Ok(()) - } - #[inline] fn get_bucket_pos(&self, val: u64) -> usize { let pos = self @@ -290,18 +319,6 @@ impl SegmentRangeCollector { debug_assert!(self.buckets[pos].range.contains(&val)); pos } - - pub(crate) fn flush( - &mut self, - bucket_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result<()> { - for bucket in &mut self.buckets { - if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { - sub_aggregation.flush(&bucket_with_accessor.sub_aggregation)?; - } - } - Ok(()) - } } /// Converts the user provided f64 range value to fast field value space. @@ -419,8 +436,9 @@ mod tests { use super::*; use crate::aggregation::agg_req::{ - Aggregation, Aggregations, BucketAggregation, BucketAggregationType, + Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation, }; + use crate::aggregation::metric::AverageAggregation; use crate::aggregation::tests::{ exec_request, exec_request_with_query, get_test_index_2_segments, get_test_index_with_num_docs, @@ -441,6 +459,7 @@ mod tests { &Default::default(), &Default::default(), field_type, + 0, ) .expect("unexpected error") } @@ -477,6 +496,47 @@ mod tests { Ok(()) } + #[test] + fn range_fraction_test_with_sub_agg() -> crate::Result<()> { + let index = get_test_index_with_num_docs(false, 100)?; + + let sub_agg_req: Aggregations = vec![( + "score_f64".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score_f64".to_string()), + )), + )] + .into_iter() + .collect(); + + let agg_req: Aggregations = vec![( + "range".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "fraction_f64".to_string(), + ranges: vec![(0f64..0.1f64).into(), (0.1f64..0.2f64).into()], + ..Default::default() + }), + sub_aggregation: sub_agg_req, + }), + )] + .into_iter() + .collect(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + assert_eq!(res["range"]["buckets"][0]["key"], "*-0"); + assert_eq!(res["range"]["buckets"][0]["doc_count"], 0); + assert_eq!(res["range"]["buckets"][1]["key"], "0-0.1"); + assert_eq!(res["range"]["buckets"][1]["doc_count"], 10); + assert_eq!(res["range"]["buckets"][2]["key"], "0.1-0.2"); + assert_eq!(res["range"]["buckets"][2]["doc_count"], 10); + assert_eq!(res["range"]["buckets"][3]["key"], "0.2-*"); + assert_eq!(res["range"]["buckets"][3]["doc_count"], 80); + + Ok(()) + } + #[test] fn range_keyed_buckets_test() -> crate::Result<()> { let index = get_test_index_with_num_docs(false, 100)?; diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index f00d075b3..e8130fdd3 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -1,5 +1,6 @@ use std::fmt::Debug; +use columnar::Cardinality; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -8,13 +9,15 @@ use crate::aggregation::agg_req_with_accessor::{ AggregationsWithAccessor, BucketAggregationWithAccessor, }; use crate::aggregation::intermediate_agg_result::{ - IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult, + IntermediateAggregationResults, IntermediateBucketResult, IntermediateTermBucketEntry, + IntermediateTermBucketResult, }; use crate::aggregation::segment_agg_result::{ build_segment_agg_collector, SegmentAggregationCollector, }; +use crate::aggregation::VecWithNames; use crate::error::DataCorruption; -use crate::{DocId, TantivyError}; +use crate::TantivyError; /// Creates a bucket for every unique term and counts the number of occurences. /// Note that doc_count in the response buckets equals term count here. @@ -259,6 +262,7 @@ pub struct SegmentTermCollector { term_buckets: TermBuckets, req: TermsAggregationInternal, blueprint: Option>, + accessor_idx: usize, } pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) { @@ -266,10 +270,85 @@ pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) { (agg_name, agg_property) } +impl SegmentAggregationCollector for SegmentTermCollector { + fn into_intermediate_aggregations_result( + self: Box, + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result { + let name = agg_with_accessor.buckets.keys[self.accessor_idx].to_string(); + let agg_with_accessor = &agg_with_accessor.buckets.values[self.accessor_idx]; + + let bucket = self.into_intermediate_bucket_result(agg_with_accessor)?; + let buckets = Some(VecWithNames::from_entries(vec![(name, bucket)])); + + Ok(IntermediateAggregationResults { + metrics: None, + buckets, + }) + } + + fn collect( + &mut self, + doc: crate::DocId, + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result<()> { + self.collect_block(&[doc], agg_with_accessor) + } + + fn collect_block( + &mut self, + docs: &[crate::DocId], + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result<()> { + let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor; + let sub_aggregation_accessor = + &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + + if accessor.get_cardinality() == Cardinality::Full { + for doc in docs { + let term_id = accessor.values.get_val(*doc); + let entry = self + .term_buckets + .entries + .entry(term_id as u32) + .or_insert_with(|| TermBucketEntry::from_blueprint(&self.blueprint)); + entry.doc_count += 1; + if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { + sub_aggregations.collect(*doc, sub_aggregation_accessor)?; + } + } + } else { + for doc in docs { + for term_id in accessor.values(*doc) { + let entry = self + .term_buckets + .entries + .entry(term_id as u32) + .or_insert_with(|| TermBucketEntry::from_blueprint(&self.blueprint)); + entry.doc_count += 1; + if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { + sub_aggregations.collect(*doc, sub_aggregation_accessor)?; + } + } + } + } + Ok(()) + } + + fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + let sub_aggregation_accessor = + &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + + self.term_buckets.force_flush(sub_aggregation_accessor)?; + Ok(()) + } +} + impl SegmentTermCollector { pub(crate) fn from_req_and_validate( req: &TermsAggregation, sub_aggregations: &AggregationsWithAccessor, + accessor_idx: usize, ) -> crate::Result { let term_buckets = TermBuckets::default(); @@ -299,6 +378,7 @@ impl SegmentTermCollector { req: TermsAggregationInternal::from_req(req), term_buckets, blueprint, + accessor_idx, }) } @@ -387,40 +467,6 @@ impl SegmentTermCollector { }, )) } - - #[inline] - pub(crate) fn collect_block( - &mut self, - docs: &[DocId], - bucket_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result<()> { - let accessor = &bucket_with_accessor.accessor; - - for doc in docs { - for term_id in accessor.values(*doc) { - let entry = self - .term_buckets - .entries - .entry(term_id as u32) - .or_insert_with(|| TermBucketEntry::from_blueprint(&self.blueprint)); - entry.doc_count += 1; - if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { - sub_aggregations.collect(*doc, &bucket_with_accessor.sub_aggregation)?; - } - } - } - - Ok(()) - } - - pub(crate) fn flush( - &mut self, - bucket_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result<()> { - self.term_buckets - .force_flush(&bucket_with_accessor.sub_aggregation)?; - Ok(()) - } } pub(crate) trait GetDocCount { @@ -631,12 +677,15 @@ mod tests { let res = exec_request(agg_req, &index)?; assert_eq!(res["my_texts"]["buckets"][0]["key"], "termb"); assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 2); + assert_eq!(res["my_texts"]["buckets"][0]["avg_score"]["value"], 6.0); assert_eq!(res["my_texts"]["buckets"][1]["key"], "termc"); assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 3); + assert_eq!(res["my_texts"]["buckets"][1]["avg_score"]["value"], 1.0); assert_eq!(res["my_texts"]["buckets"][2]["key"], "terma"); assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5); + assert_eq!(res["my_texts"]["buckets"][2]["avg_score"]["value"], 5.0); assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 8c453986a..b972bde2d 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -21,7 +21,6 @@ use super::metric::{ IntermediateAverage, IntermediateCount, IntermediateMax, IntermediateMin, IntermediateStats, IntermediateSum, }; -use super::segment_agg_result::SegmentMetricResultCollector; use super::{format_date, Key, SerializedKey, VecWithNames}; use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry}; use crate::aggregation::bucket::TermsAggregationInternal; @@ -220,32 +219,6 @@ pub enum IntermediateMetricResult { Sum(IntermediateSum), } -impl From for IntermediateMetricResult { - fn from(tree: SegmentMetricResultCollector) -> Self { - use super::metric::SegmentStatsType; - match tree { - SegmentMetricResultCollector::Stats(collector) => match collector.collecting_for { - SegmentStatsType::Average => IntermediateMetricResult::Average( - IntermediateAverage::from_collector(collector), - ), - SegmentStatsType::Count => { - IntermediateMetricResult::Count(IntermediateCount::from_collector(collector)) - } - SegmentStatsType::Max => { - IntermediateMetricResult::Max(IntermediateMax::from_collector(collector)) - } - SegmentStatsType::Min => { - IntermediateMetricResult::Min(IntermediateMin::from_collector(collector)) - } - SegmentStatsType::Stats => IntermediateMetricResult::Stats(collector.stats), - SegmentStatsType::Sum => { - IntermediateMetricResult::Sum(IntermediateSum::from_collector(collector)) - } - }, - } - } -} - impl IntermediateMetricResult { pub(crate) fn empty_from_req(req: &MetricAggregation) -> Self { match req { diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index f8ecb887e..af26c7a18 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -172,6 +172,7 @@ impl SegmentStatsCollector { accessor_idx, } } + #[inline] pub(crate) fn collect_block_with_field(&mut self, docs: &[DocId], field: &Column) { if field.get_cardinality() == Cardinality::Full { for doc in docs { @@ -195,7 +196,7 @@ impl SegmentAggregationCollector for SegmentStatsCollector { self: Box, agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result { - let name = agg_with_accessor.metrics.keys[0].to_string(); + let name = agg_with_accessor.metrics.keys[self.accessor_idx].to_string(); let intermediate_metric_result = match self.collecting_for { SegmentStatsType::Average => { @@ -234,20 +235,15 @@ impl SegmentAggregationCollector for SegmentStatsCollector { ) -> crate::Result<()> { let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor; - if field.get_cardinality() == Cardinality::Full { - let val = field.values.get_val(doc); + for val in field.values(doc) { let val1 = f64_from_fastfield_u64(val, &self.field_type); self.stats.collect(val1); - } else { - for val in field.values(doc) { - let val1 = f64_from_fastfield_u64(val, &self.field_type); - self.stats.collect(val1); - } } Ok(()) } + #[inline] fn collect_block( &mut self, docs: &[crate::DocId], diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 191b4fdf5..b92c99947 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -209,12 +209,9 @@ impl From> for VecWithNames { } impl VecWithNames { - fn from_other>(entries: VecWithNames) -> Self { - let values = entries.values.into_iter().map(Into::into).collect(); - Self { - keys: entries.keys, - values, - } + fn extend(&mut self, entries: VecWithNames) { + self.keys.extend(entries.keys); + self.values.extend(entries.values); } fn from_entries(mut entries: Vec<(String, T)>) -> Self { @@ -1495,6 +1492,49 @@ mod tests { }); } + #[bench] + fn bench_aggregation_range_with_avg(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let sub_agg_req: Aggregations = vec![( + "average_f64".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score_f64".to_string()), + )), + )] + .into_iter() + .collect(); + + let agg_req_1: Aggregations = vec![( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score_f64".to_string(), + ranges: vec![ + (3f64..7000f64).into(), + (7000f64..20000f64).into(), + (20000f64..30000f64).into(), + (30000f64..40000f64).into(), + (40000f64..50000f64).into(), + (50000f64..60000f64).into(), + ], + ..Default::default() + }), + sub_aggregation: sub_agg_req, + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + // hard bounds has a different algorithm, because it actually limits collection range #[bench] fn bench_aggregation_histogram_only_hard_bounds(b: &mut Bencher) { diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 3115d1322..3e6bf6b3c 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -14,14 +14,14 @@ use super::agg_req_with_accessor::{ use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTermCollector}; use super::buf_collector::BufAggregationCollector; use super::collector::MAX_BUCKET_COUNT; -use super::intermediate_agg_result::{IntermediateAggregationResults, IntermediateBucketResult}; +use super::intermediate_agg_result::IntermediateAggregationResults; use super::metric::{ AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, SegmentStatsCollector, SegmentStatsType, StatsAggregation, SumAggregation, }; use super::VecWithNames; use crate::aggregation::agg_req::BucketAggregationType; -use crate::{DocId, TantivyError}; +use crate::TantivyError; pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug { fn into_intermediate_aggregations_result( @@ -74,41 +74,14 @@ pub(crate) fn build_segment_agg_collector( if req.buckets.is_empty() && req.metrics.len() == 1 { let req = &req.metrics.values[0]; let accessor_idx = 0; - let stats_collector = match &req.metric { - MetricAggregation::Average(AverageAggregation { .. }) => { - SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Average, - accessor_idx, - ) - } - MetricAggregation::Count(CountAggregation { .. }) => SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Count, - accessor_idx, - ), - MetricAggregation::Max(MaxAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx) - } - MetricAggregation::Min(MinAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx) - } - MetricAggregation::Stats(StatsAggregation { .. }) => SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Stats, - accessor_idx, - ), - MetricAggregation::Sum(SumAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx) - } - }; + return build_metric_segment_agg_collector(req, accessor_idx, add_buffer_layer); + } - if add_buffer_layer { - let stats_collector = BufAggregationCollector::new(stats_collector); - return Ok(Box::new(stats_collector)); - } else { - return Ok(Box::new(stats_collector)); - } + // Single bucket special case + if req.metrics.is_empty() && req.buckets.len() == 1 { + let req = &req.buckets.values[0]; + let accessor_idx = 0; + return build_bucket_segment_agg_collector(req, accessor_idx, add_buffer_layer); } let agg = GenericSegmentAggregationResultsCollector::from_req_and_validate(req)?; @@ -120,14 +93,96 @@ pub(crate) fn build_segment_agg_collector( } } -#[derive(Clone)] +pub(crate) fn build_metric_segment_agg_collector( + req: &MetricAggregationWithAccessor, + accessor_idx: usize, + add_buffer_layer: bool, +) -> crate::Result> { + let stats_collector = match &req.metric { + MetricAggregation::Average(AverageAggregation { .. }) => { + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Average, accessor_idx) + } + MetricAggregation::Count(CountAggregation { .. }) => { + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count, accessor_idx) + } + MetricAggregation::Max(MaxAggregation { .. }) => { + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx) + } + MetricAggregation::Min(MinAggregation { .. }) => { + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx) + } + MetricAggregation::Stats(StatsAggregation { .. }) => { + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats, accessor_idx) + } + MetricAggregation::Sum(SumAggregation { .. }) => { + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx) + } + }; + + if add_buffer_layer { + let stats_collector = BufAggregationCollector::new(stats_collector); + Ok(Box::new(stats_collector)) + } else { + Ok(Box::new(stats_collector)) + } +} + +fn box_with_opt_buffer( + add_buffer_layer: bool, + collector: T, +) -> Box { + if add_buffer_layer { + let collector = BufAggregationCollector::new(collector); + Box::new(collector) + } else { + Box::new(collector) + } +} + +pub(crate) fn build_bucket_segment_agg_collector( + req: &BucketAggregationWithAccessor, + accessor_idx: usize, + add_buffer_layer: bool, +) -> crate::Result> { + match &req.bucket_agg { + BucketAggregationType::Terms(terms_req) => Ok(box_with_opt_buffer( + add_buffer_layer, + SegmentTermCollector::from_req_and_validate( + terms_req, + &req.sub_aggregation, + accessor_idx, + )?, + )), + BucketAggregationType::Range(range_req) => Ok(box_with_opt_buffer( + add_buffer_layer, + SegmentRangeCollector::from_req_and_validate( + range_req, + &req.sub_aggregation, + &req.bucket_count, + req.field_type, + accessor_idx, + )?, + )), + BucketAggregationType::Histogram(histogram) => Ok(box_with_opt_buffer( + add_buffer_layer, + SegmentHistogramCollector::from_req_and_validate( + histogram, + &req.sub_aggregation, + req.field_type, + &req.accessor, + accessor_idx, + )?, + )), + } +} + +#[derive(Clone, Default)] /// The GenericSegmentAggregationResultsCollector is the generic version of the collector, which /// can handle arbitrary complexity of sub-aggregations. Ideally we never have to pick this one /// and can provide specialized versions instead, that remove some of its overhead. -#[derive(Default)] pub(crate) struct GenericSegmentAggregationResultsCollector { - pub(crate) metrics: Option>, - pub(crate) buckets: Option>, + pub(crate) metrics: Option>>, + pub(crate) buckets: Option>>, } impl Debug for GenericSegmentAggregationResultsCollector { @@ -145,16 +200,29 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector { agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result { let buckets = if let Some(buckets) = self.buckets { - let entries = buckets - .into_iter() - .zip(agg_with_accessor.buckets.values()) - .map(|((key, bucket), acc)| Ok((key, bucket.into_intermediate_bucket_result(acc)?))) - .collect::>>()?; - Some(VecWithNames::from_entries(entries)) + let mut intermeditate_buckets = VecWithNames::default(); + for bucket in buckets { + // TODO too many allocations? + let res = bucket.into_intermediate_aggregations_result(agg_with_accessor)?; + // unwrap is fine since we only have buckets here + intermeditate_buckets.extend(res.buckets.unwrap()); + } + Some(intermeditate_buckets) + } else { + None + }; + let metrics = if let Some(metrics) = self.metrics { + let mut intermeditate_metrics = VecWithNames::default(); + for metric in metrics { + // TODO too many allocations? + let res = metric.into_intermediate_aggregations_result(agg_with_accessor)?; + // unwrap is fine since we only have metrics here + intermeditate_metrics.extend(res.metrics.unwrap()); + } + Some(intermeditate_metrics) } else { None }; - let metrics = self.metrics.map(VecWithNames::from_other); Ok(IntermediateAggregationResults { metrics, buckets }) } @@ -175,17 +243,13 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector { agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result<()> { if let Some(metrics) = self.metrics.as_mut() { - for (collector, agg_with_accessor) in - metrics.values_mut().zip(agg_with_accessor.metrics.values()) - { - collector.collect_block(docs, agg_with_accessor); + for collector in metrics { + collector.collect_block(docs, agg_with_accessor)?; } } if let Some(buckets) = self.buckets.as_mut() { - for (collector, agg_with_accessor) in - buckets.values_mut().zip(agg_with_accessor.buckets.values()) - { + for collector in buckets { collector.collect_block(docs, agg_with_accessor)?; } } @@ -194,10 +258,13 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector { } fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + if let Some(metrics) = &mut self.metrics { + for collector in metrics { + collector.flush(agg_with_accessor)?; + } + } if let Some(buckets) = &mut self.buckets { - for (collector, agg_with_accessor) in - buckets.values_mut().zip(agg_with_accessor.buckets.values()) - { + for collector in buckets { collector.flush(agg_with_accessor)?; } } @@ -206,218 +273,46 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector { } impl GenericSegmentAggregationResultsCollector { - pub fn into_intermediate_aggregations_result( - self, - agg_with_accessor: &AggregationsWithAccessor, - ) -> crate::Result { - let buckets = if let Some(buckets) = self.buckets { - let entries = buckets - .into_iter() - .zip(agg_with_accessor.buckets.values()) - .map(|((key, bucket), acc)| Ok((key, bucket.into_intermediate_bucket_result(acc)?))) - .collect::>>()?; - Some(VecWithNames::from_entries(entries)) - } else { - None - }; - let metrics = self.metrics.map(VecWithNames::from_other); - - Ok(IntermediateAggregationResults { metrics, buckets }) - } - pub(crate) fn from_req_and_validate(req: &AggregationsWithAccessor) -> crate::Result { let buckets = req .buckets .iter() - .map(|(key, req)| { - Ok(( - key.to_string(), - SegmentBucketResultCollector::from_req_and_validate(req)?, - )) + .enumerate() + .map(|(accessor_idx, (_key, req))| { + Ok(build_bucket_segment_agg_collector( + req, + accessor_idx, + false, + )?) }) - .collect::>>()?; + .collect::>>>()?; let metrics = req .metrics .iter() .enumerate() - .map(|(accesor_idx, (key, req))| { - Ok(( - key.to_string(), - SegmentMetricResultCollector::from_req_and_validate(req, accesor_idx)?, - )) + .map(|(accessor_idx, (_key, req))| { + Ok(build_metric_segment_agg_collector( + req, + accessor_idx, + false, + )?) }) - .collect::>>()?; + .collect::>>>()?; let metrics = if metrics.is_empty() { None } else { - Some(VecWithNames::from_entries(metrics)) + Some(metrics) }; + let buckets = if buckets.is_empty() { None } else { - Some(VecWithNames::from_entries(buckets)) + Some(buckets) }; Ok(GenericSegmentAggregationResultsCollector { metrics, buckets }) } } -#[derive(Clone, Debug, PartialEq)] -pub(crate) enum SegmentMetricResultCollector { - Stats(SegmentStatsCollector), -} - -impl SegmentMetricResultCollector { - pub fn from_req_and_validate( - req: &MetricAggregationWithAccessor, - accessor_idx: usize, - ) -> crate::Result { - match &req.metric { - MetricAggregation::Average(AverageAggregation { .. }) => Ok( - SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Average, - accessor_idx, - )), - ), - MetricAggregation::Count(CountAggregation { .. }) => Ok( - SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Count, - accessor_idx, - )), - ), - MetricAggregation::Max(MaxAggregation { .. }) => Ok( - SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Max, - accessor_idx, - )), - ), - MetricAggregation::Min(MinAggregation { .. }) => Ok( - SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Min, - accessor_idx, - )), - ), - MetricAggregation::Stats(StatsAggregation { .. }) => Ok( - SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Stats, - accessor_idx, - )), - ), - MetricAggregation::Sum(SumAggregation { .. }) => Ok( - SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Sum, - accessor_idx, - )), - ), - } - } - pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) { - match self { - SegmentMetricResultCollector::Stats(stats_collector) => { - stats_collector.collect_block_with_field(doc, &metric.accessor); - } - } - } -} - -/// SegmentBucketAggregationResultCollectors will have specialized buckets for collection inside -/// segments. -/// The typical structure of Map is not suitable during collection for performance -/// reasons. -#[derive(Clone, Debug)] -pub(crate) enum SegmentBucketResultCollector { - Range(SegmentRangeCollector), - Histogram(Box), - Terms(Box), -} - -impl SegmentBucketResultCollector { - pub fn into_intermediate_bucket_result( - self, - agg_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result { - match self { - SegmentBucketResultCollector::Terms(terms) => { - terms.into_intermediate_bucket_result(agg_with_accessor) - } - SegmentBucketResultCollector::Range(range) => { - range.into_intermediate_bucket_result(agg_with_accessor) - } - SegmentBucketResultCollector::Histogram(histogram) => { - histogram.into_intermediate_bucket_result(agg_with_accessor) - } - } - } - - pub fn from_req_and_validate(req: &BucketAggregationWithAccessor) -> crate::Result { - match &req.bucket_agg { - BucketAggregationType::Terms(terms_req) => Ok(Self::Terms(Box::new( - SegmentTermCollector::from_req_and_validate(terms_req, &req.sub_aggregation)?, - ))), - BucketAggregationType::Range(range_req) => { - Ok(Self::Range(SegmentRangeCollector::from_req_and_validate( - range_req, - &req.sub_aggregation, - &req.bucket_count, - req.field_type, - )?)) - } - BucketAggregationType::Histogram(histogram) => Ok(Self::Histogram(Box::new( - SegmentHistogramCollector::from_req_and_validate( - histogram, - &req.sub_aggregation, - req.field_type, - &req.accessor, - )?, - ))), - } - } - - #[inline] - pub(crate) fn collect_block( - &mut self, - docs: &[DocId], - bucket_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result<()> { - match self { - SegmentBucketResultCollector::Range(range) => { - range.collect_block(docs, bucket_with_accessor)?; - } - SegmentBucketResultCollector::Histogram(histogram) => { - histogram.collect_block(docs, bucket_with_accessor)?; - } - SegmentBucketResultCollector::Terms(terms) => { - terms.collect_block(docs, bucket_with_accessor)?; - } - } - Ok(()) - } - - #[inline] - pub(crate) fn flush( - &mut self, - bucket_with_accessor: &BucketAggregationWithAccessor, - ) -> crate::Result<()> { - match self { - SegmentBucketResultCollector::Range(range) => { - range.flush(bucket_with_accessor)?; - } - SegmentBucketResultCollector::Histogram(histogram) => { - histogram.flush(bucket_with_accessor)?; - } - SegmentBucketResultCollector::Terms(terms) => { - terms.flush(bucket_with_accessor)?; - } - } - Ok(()) - } -} - #[derive(Clone)] pub(crate) struct BucketCount { /// The counter which is shared between the aggregations for one request.