diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index f3c0d13cc..cb35d1da3 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -226,7 +226,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx]; let bucket = self.into_intermediate_bucket_result(agg_with_accessor)?; - results.push(name, IntermediateAggregationResult::Bucket(bucket)); + results.push(name, IntermediateAggregationResult::Bucket(bucket))?; Ok(()) } diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index f478b2f2e..58d35f396 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -204,7 +204,7 @@ impl SegmentAggregationCollector for SegmentRangeCollector { column_type: Some(self.column_type), }); - results.push(name, IntermediateAggregationResult::Bucket(bucket)); + results.push(name, IntermediateAggregationResult::Bucket(bucket))?; Ok(()) } diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index fa6344d2d..0a419414c 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -251,7 +251,7 @@ impl SegmentAggregationCollector for SegmentTermCollector { let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx]; let bucket = self.into_intermediate_bucket_result(agg_with_accessor)?; - results.push(name, IntermediateAggregationResult::Bucket(bucket)); + results.push(name, IntermediateAggregationResult::Bucket(bucket))?; Ok(()) } diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 2ff1dd394..d3fd9e7ca 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -3,6 +3,7 @@ //! indices. use std::cmp::Ordering; +use std::collections::hash_map::Entry; use std::hash::Hash; use columnar::ColumnType; @@ -22,7 +23,7 @@ use super::metric::{ IntermediateSum, PercentilesCollector, }; use super::segment_agg_result::AggregationLimits; -use super::{format_date, AggregationError, Key, SerializedKey, VecWithNames}; +use super::{format_date, AggregationError, Key, SerializedKey}; use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry}; use crate::aggregation::bucket::TermsAggregationInternal; use crate::TantivyError; @@ -33,7 +34,7 @@ use crate::TantivyError; /// Notice: This struct should not be de/serialized via JSON format. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateAggregationResults { - pub(crate) aggs_res: VecWithNames, + pub(crate) aggs_res: FxHashMap, } #[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq)] @@ -77,8 +78,18 @@ impl std::hash::Hash for IntermediateKey { impl IntermediateAggregationResults { /// Add a result - pub fn push(&mut self, key: String, value: IntermediateAggregationResult) { - self.aggs_res.push(key, value); + pub fn push(&mut self, key: String, value: IntermediateAggregationResult) -> crate::Result<()> { + let entry = self.aggs_res.entry(key); + match entry { + Entry::Occupied(mut e) => { + // In case of term aggregation over different types, we need to merge the results. + e.get_mut().merge_fruits(value)?; + } + Entry::Vacant(e) => { + e.insert(value); + } + } + Ok(()) } /// Convert intermediate result and its aggregation request to the final result. @@ -128,10 +139,10 @@ impl IntermediateAggregationResults { } pub(crate) fn empty_from_req(req: &Aggregations) -> Self { - let mut aggs_res: VecWithNames = VecWithNames::default(); + let mut aggs_res: FxHashMap = FxHashMap::default(); for (key, req) in req.iter() { let empty_res = empty_from_req(req); - aggs_res.push(key.to_string(), empty_res); + aggs_res.insert(key.to_string(), empty_res); } Self { aggs_res } @@ -765,7 +776,7 @@ mod tests { )), ); IntermediateAggregationResults { - aggs_res: VecWithNames::from_entries(map.into_iter().collect()), + aggs_res: map.into_iter().collect(), } } @@ -799,7 +810,7 @@ mod tests { )), ); IntermediateAggregationResults { - aggs_res: VecWithNames::from_entries(map.into_iter().collect()), + aggs_res: map.into_iter().collect(), } } diff --git a/src/aggregation/metric/percentiles.rs b/src/aggregation/metric/percentiles.rs index befda0b0c..2326054c0 100644 --- a/src/aggregation/metric/percentiles.rs +++ b/src/aggregation/metric/percentiles.rs @@ -265,7 +265,7 @@ impl SegmentAggregationCollector for SegmentPercentilesCollector { results.push( name, IntermediateAggregationResult::Metric(intermediate_metric_result), - ); + )?; Ok(()) } diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index f53d6c745..2011d5be2 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -222,7 +222,7 @@ impl SegmentAggregationCollector for SegmentStatsCollector { results.push( name, IntermediateAggregationResult::Metric(intermediate_metric_result), - ); + )?; Ok(()) } diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index f6d6aeefe..7a7fc0ef4 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -193,11 +193,6 @@ impl From> for VecWithNames { } impl VecWithNames { - fn push(&mut self, key: String, value: T) { - self.keys.push(key); - self.values.push(value); - } - fn from_entries(mut entries: Vec<(String, T)>) -> Self { // Sort to ensure order of elements match across multiple instances entries.sort_by(|left, right| left.0.cmp(&right.0)); @@ -212,18 +207,12 @@ impl VecWithNames { keys: data_names, } } - fn into_iter(self) -> impl Iterator { - self.keys.into_iter().zip(self.values.into_iter()) - } fn iter(&self) -> impl Iterator + '_ { self.keys().zip(self.values.iter()) } fn keys(&self) -> impl Iterator + '_ { self.keys.iter().map(|key| key.as_str()) } - fn into_values(self) -> impl Iterator { - self.values.into_iter() - } fn values_mut(&mut self) -> impl Iterator + '_ { self.values.iter_mut() }