From e923daa8a6e30e7c8aa50eae7b2d2bdf2d3dfbd1 Mon Sep 17 00:00:00 2001 From: Giovanni Cuccu Date: Sun, 19 Nov 2023 15:15:53 +0100 Subject: [PATCH] refactor for using ExtendedStats only when needed --- src/aggregation/metric/average.rs | 12 +- src/aggregation/metric/count.rs | 12 +- src/aggregation/metric/max.rs | 12 +- src/aggregation/metric/min.rs | 12 +- src/aggregation/metric/stats.rs | 395 ++++++++++++++++++-------- src/aggregation/metric/sum.rs | 12 +- src/aggregation/segment_agg_result.rs | 27 +- 7 files changed, 308 insertions(+), 174 deletions(-) diff --git a/src/aggregation/metric/average.rs b/src/aggregation/metric/average.rs index a4047adef..8e9ad69ad 100644 --- a/src/aggregation/metric/average.rs +++ b/src/aggregation/metric/average.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateExtendedStats, SegmentStatsCollector}; +use super::IntermediateStats; /// A single-value metric aggregation that computes the average of numeric values that are /// extracted from the aggregated documents. @@ -46,15 +46,13 @@ impl AverageAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateAverage { - stats: IntermediateExtendedStats, + stats: IntermediateStats, } impl IntermediateAverage { - /// Creates a new [`IntermediateAverage`] instance from a [`SegmentStatsCollector`]. - pub(crate) fn from_collector(collector: SegmentStatsCollector) -> Self { - Self { - stats: collector.stats, - } + /// Creates a new [`IntermediateAverage`] instance from a [`IntermediateStats`]. + pub(crate) fn from_stats(stats: IntermediateStats) -> Self { + Self { stats } } /// Merges the other intermediate result into self. pub fn merge_fruits(&mut self, other: IntermediateAverage) { diff --git a/src/aggregation/metric/count.rs b/src/aggregation/metric/count.rs index d645ce3f4..afdcfe55d 100644 --- a/src/aggregation/metric/count.rs +++ b/src/aggregation/metric/count.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateExtendedStats, SegmentStatsCollector}; +use super::IntermediateStats; /// A single-value metric aggregation that counts the number of values that are /// extracted from the aggregated documents. @@ -46,15 +46,13 @@ impl CountAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateCount { - stats: IntermediateExtendedStats, + stats: IntermediateStats, } impl IntermediateCount { - /// Creates a new [`IntermediateCount`] instance from a [`SegmentStatsCollector`]. - pub(crate) fn from_collector(collector: SegmentStatsCollector) -> Self { - Self { - stats: collector.stats, - } + /// Creates a new [`IntermediateAverage`] instance from a [`IntermediateStats`]. + pub(crate) fn from_stats(stats: IntermediateStats) -> Self { + Self { stats } } /// Merges the other intermediate result into self. pub fn merge_fruits(&mut self, other: IntermediateCount) { diff --git a/src/aggregation/metric/max.rs b/src/aggregation/metric/max.rs index fe66fc0cb..b1be96dfd 100644 --- a/src/aggregation/metric/max.rs +++ b/src/aggregation/metric/max.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateExtendedStats, SegmentStatsCollector}; +use super::IntermediateStats; /// A single-value metric aggregation that computes the maximum of numeric values that are /// extracted from the aggregated documents. @@ -46,15 +46,13 @@ impl MaxAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateMax { - stats: IntermediateExtendedStats, + stats: IntermediateStats, } impl IntermediateMax { - /// Creates a new [`IntermediateMax`] instance from a [`SegmentStatsCollector`]. - pub(crate) fn from_collector(collector: SegmentStatsCollector) -> Self { - Self { - stats: collector.stats, - } + /// Creates a new [`IntermediateAverage`] instance from a [`IntermediateStats`]. + pub(crate) fn from_stats(stats: IntermediateStats) -> Self { + Self { stats } } /// Merges the other intermediate result into self. pub fn merge_fruits(&mut self, other: IntermediateMax) { diff --git a/src/aggregation/metric/min.rs b/src/aggregation/metric/min.rs index da717b241..45ffdcb9d 100644 --- a/src/aggregation/metric/min.rs +++ b/src/aggregation/metric/min.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateExtendedStats, SegmentStatsCollector}; +use super::IntermediateStats; /// A single-value metric aggregation that computes the minimum of numeric values that are /// extracted from the aggregated documents. @@ -46,15 +46,13 @@ impl MinAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateMin { - stats: IntermediateExtendedStats, + stats: IntermediateStats, } impl IntermediateMin { - /// Creates a new [`IntermediateMin`] instance from a [`SegmentStatsCollector`]. - pub(crate) fn from_collector(collector: SegmentStatsCollector) -> Self { - Self { - stats: collector.stats, - } + /// Creates a new [`IntermediateAverage`] instance from a [`IntermediateStats`]. + pub(crate) fn from_stats(stats: IntermediateStats) -> Self { + Self { stats } } /// Merges the other intermediate result into self. pub fn merge_fruits(&mut self, other: IntermediateMin) { diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index d7cc815f9..e5e1fe694 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -1,3 +1,5 @@ +use std::fmt::Debug; + use columnar::ColumnType; use serde::{Deserialize, Serialize}; @@ -234,41 +236,8 @@ impl ExtendedStats { /// Intermediate result of the stats aggregation that can be combined with other intermediate /// results. - -#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct IntermediateStats { - stats: IntermediateExtendedStats, -} - -impl IntermediateStats { - pub(crate) fn from_collector(collector: SegmentStatsCollector) -> Self { - Self { - stats: collector.stats, - } - } - - /// Merges the other stats intermediate result into self. - pub fn merge_fruits(&mut self, other: IntermediateStats) { - self.stats.merge_fruits(other.stats); - } - - /// Computes the final stats value. - pub fn finalize(&self) -> Stats { - let extended_stats = self.stats.finalize(); - Stats { - count: extended_stats.count, - sum: extended_stats.sum, - min: extended_stats.min, - max: extended_stats.max, - avg: extended_stats.avg, - } - } -} - -/// Intermediate result of the extended stats aggregation that can be combined with other -/// intermediate results. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct IntermediateExtendedStats { +pub struct IntermediateStats { /// The number of extracted values. count: u64, /// The sum of the extracted values. @@ -279,6 +248,77 @@ pub struct IntermediateExtendedStats { min: f64, /// The max value. max: f64, +} + +impl Default for IntermediateStats { + fn default() -> Self { + Self { + count: 0, + sum: 0.0, + delta: 0.0, + min: f64::MAX, + max: f64::MIN, + } + } +} + +impl IntermediateStats { + /// Merges the other stats intermediate result into self. + pub fn merge_fruits(&mut self, other: IntermediateStats) { + self.count += other.count; + self.sum += other.sum; + self.delta += other.delta; + self.min = self.min.min(other.min); + self.max = self.max.max(other.max); + } + + /// Computes the final stats value. + pub fn finalize(&self) -> Stats { + let min = if self.count == 0 { + None + } else { + Some(self.min) + }; + let max = if self.count == 0 { + None + } else { + Some(self.max) + }; + let avg = if self.count == 0 { + None + } else { + Some(self.sum / (self.count as f64)) + }; + Stats { + count: self.count, + sum: self.sum, + min, + max, + avg, + } + } + + #[inline] + fn collect(&mut self, value: f64) { + self.count += 1; + + // kahan algorithm for sum + let y = value - self.delta; + let t = self.sum + y; + self.delta = (t - self.sum) - y; + self.sum = t; + + self.min = self.min.min(value); + self.max = self.max.max(value); + } +} + +/// Intermediate result of the extended stats aggregation that can be combined with other +/// intermediate results. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct IntermediateExtendedStats { + intermediate_stats: IntermediateStats, + /// The number of extracted values. // The sum of square values, it's referred as M2 in Welford's online algorithm sum_of_squares: f64, // The sum of square values as computed by elastic search @@ -295,11 +335,7 @@ pub struct IntermediateExtendedStats { impl Default for IntermediateExtendedStats { fn default() -> Self { Self { - count: 0, - sum: 0.0, - delta: 0.0, - min: f64::MAX, - max: f64::MIN, + intermediate_stats: IntermediateStats::default(), sum_of_squares: 0.0, sum_of_squares_elastic: 0.0, delta_sum_for_squares_elastic: 0.0, @@ -314,11 +350,7 @@ impl IntermediateExtendedStats { /// containing the sigma to be used for calculating bound values. pub fn with_sigma(sigma: Option) -> Self { Self { - count: 0, - sum: 0.0, - delta: 0.0, - min: f64::MAX, - max: f64::MIN, + intermediate_stats: IntermediateStats::default(), sum_of_squares: 0.0, sum_of_squares_elastic: 0.0, delta_sum_for_squares_elastic: 0.0, @@ -328,44 +360,43 @@ impl IntermediateExtendedStats { } /// Merges the other stats intermediate result into self. pub fn merge_fruits(&mut self, other: IntermediateExtendedStats) { - self.min = self.min.min(other.min); - self.max = self.max.max(other.max); - - if other.count != 0 { - if self.count == 0 { + if other.intermediate_stats.count != 0 { + if self.intermediate_stats.count == 0 { self.sum_of_squares = other.sum_of_squares; self.sum_of_squares_elastic = other.sum_of_squares_elastic; - self.count = other.count; + self.intermediate_stats = other.intermediate_stats; self.mean = other.mean; - self.sum = other.sum; - self.delta = other.delta; self.delta_sum_for_squares_elastic = other.delta_sum_for_squares_elastic } else { - if other.count == 1 { - self.collect(other.sum); - } else if self.count == 1 { - let sum = self.sum; + if other.intermediate_stats.count == 1 { + self.collect(other.intermediate_stats.sum); + } else if self.intermediate_stats.count == 1 { + let sum = self.intermediate_stats.sum; + self.intermediate_stats = other.intermediate_stats; self.sum_of_squares = other.sum_of_squares; self.sum_of_squares_elastic = other.sum_of_squares_elastic; - self.count = other.count; self.mean = other.mean; - self.sum = other.sum; - self.delta = other.delta; self.delta_sum_for_squares_elastic = other.delta_sum_for_squares_elastic; self.collect(sum); } else { // parallel version of Welford's online algorithm // the mean is computed using sum and count because // it's more precise (and sum is already available) - let new_count = self.count + other.count; + let new_count = self.intermediate_stats.count + other.intermediate_stats.count; let delta = other.mean - self.mean; self.sum_of_squares += other.sum_of_squares - + delta * delta * self.count as f64 * other.count as f64 / new_count as f64; - self.count = new_count; + + delta + * delta + * self.intermediate_stats.count as f64 + * other.intermediate_stats.count as f64 + / new_count as f64; + self.intermediate_stats.count = new_count; // self.mean=self.mean + delta*other.count as f64/new_count as f64; - self.mean = (self.sum as f64 + other.sum as f64) / new_count as f64; - self.sum += other.sum; - self.delta += other.delta; + self.mean = (self.intermediate_stats.sum as f64 + + other.intermediate_stats.sum as f64) + / new_count as f64; + self.intermediate_stats.sum += other.intermediate_stats.sum; + self.intermediate_stats.delta += other.intermediate_stats.delta; self.sum_of_squares_elastic += other.sum_of_squares_elastic; self.delta_sum_for_squares_elastic += other.delta_sum_for_squares_elastic } @@ -375,35 +406,35 @@ impl IntermediateExtendedStats { /// Computes the final stats value. pub fn finalize(&self) -> ExtendedStats { - let min = if self.count == 0 { + let min = if self.intermediate_stats.count == 0 { None } else { - Some(self.min) + Some(self.intermediate_stats.min) }; - let max = if self.count == 0 { + let max = if self.intermediate_stats.count == 0 { None } else { - Some(self.max) + Some(self.intermediate_stats.max) }; - let avg = if self.count == 0 { + let avg = if self.intermediate_stats.count == 0 { None } else { Some(self.mean) }; - let sum_of_squares = if self.count == 0 { + let sum_of_squares = if self.intermediate_stats.count == 0 { None } else { Some(self.sum_of_squares_elastic) }; - let variance = if self.count <= 1 { + let variance = if self.intermediate_stats.count <= 1 { None } else { - Some(self.sum_of_squares / self.count as f64) + Some(self.sum_of_squares / self.intermediate_stats.count as f64) }; - let variance_sampling = if self.count <= 1 { + let variance_sampling = if self.intermediate_stats.count <= 1 { None } else { - Some(self.sum_of_squares / (self.count - 1) as f64) + Some(self.sum_of_squares / (self.intermediate_stats.count - 1) as f64) }; let std_deviation = variance.map(|v| v.sqrt()); let std_deviation_sampling = variance_sampling.map(|v| v.sqrt()); @@ -424,8 +455,8 @@ impl IntermediateExtendedStats { }) }; ExtendedStats { - count: self.count, - sum: self.sum, + count: self.intermediate_stats.count, + sum: self.intermediate_stats.sum, min, max, avg, @@ -441,22 +472,12 @@ impl IntermediateExtendedStats { } fn collect(&mut self, value: f64) { - self.count += 1; - - // kahan algorithm for sum - let y = value - self.delta; - let t = self.sum + y; - self.delta = (t - self.sum) - y; - self.sum = t; - + self.intermediate_stats.collect(value); // kahan algorithm for sum_of_squares_elastic let y = value * value - self.delta_sum_for_squares_elastic; let t = self.sum_of_squares_elastic + y; self.delta_sum_for_squares_elastic = (t - self.sum_of_squares_elastic) - y; self.sum_of_squares_elastic = t; - - self.min = self.min.min(value); - self.max = self.max.max(value); self.update_variance(value); } @@ -467,13 +488,109 @@ impl IntermediateExtendedStats { // error (in 15th decimal place) that caused a test //(test_aggregation_level1 in agg_test.rs) // failure - self.mean = self.sum / self.count as f64; + self.mean = self.intermediate_stats.sum / self.intermediate_stats.count as f64; // self.mean += delta / self.count as f64; let delta2 = value - self.mean; self.sum_of_squares += delta * delta2; } } +/// trait implemented by IntermediateStats and +/// IntermediateExtendedStats that allows the +/// usage of a single [SegmentStatsCollector] +pub trait IntermediateInnerCollector { + /// collects the single field value to form + /// the statistic + fn collect(&mut self, val: f64); + /// transforms the intermediate stat + /// into a [IntermediateMetricResult] + fn into_intermediate_metric_result(self) -> IntermediateMetricResult; +} + +/// Intermediate struct used by [SegmentStatsCollector] +/// for calculating Stats and creating [IntermediateMetricResult] +#[derive(Clone, Debug, PartialEq)] +pub struct IntermediateInnerStatsCollector { + stats: IntermediateStats, + collecting_for: SegmentStatsType, +} + +impl IntermediateInnerStatsCollector { + pub(crate) fn from(collecting_for: SegmentStatsType) -> Self { + IntermediateInnerStatsCollector { + stats: IntermediateStats::default(), + collecting_for: collecting_for, + } + } +} + +impl IntermediateInnerCollector for IntermediateInnerStatsCollector { + #[inline] + fn collect(&mut self, value: f64) { + self.stats.collect(value); + } + + fn into_intermediate_metric_result(self) -> IntermediateMetricResult { + match self.collecting_for { + SegmentStatsType::Average => { + IntermediateMetricResult::Average(IntermediateAverage::from_stats(self.stats)) + } + SegmentStatsType::Count => { + IntermediateMetricResult::Count(IntermediateCount::from_stats(self.stats)) + } + SegmentStatsType::Max => { + IntermediateMetricResult::Max(IntermediateMax::from_stats(self.stats)) + } + SegmentStatsType::Min => { + IntermediateMetricResult::Min(IntermediateMin::from_stats(self.stats)) + } + SegmentStatsType::Stats => IntermediateMetricResult::Stats(self.stats), + SegmentStatsType::ExtendedStats => { + panic!("cannot create IntermediateMetricResult for ExtendStats from Stats"); + } + SegmentStatsType::Sum => { + IntermediateMetricResult::Sum(IntermediateSum::from_stats(self.stats)) + } + } + } +} + +/// Intermediate struct used by [SegmentStatsCollector] +/// for calculating ExtendedStats and creating [IntermediateMetricResult] +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct IntermediateInnerExtendedStatsCollector { + stats: IntermediateExtendedStats, + collecting_for: SegmentStatsType, +} + +impl IntermediateInnerExtendedStatsCollector { + pub fn from(collecting_for: SegmentStatsType, sigma: Option) -> Self { + IntermediateInnerExtendedStatsCollector { + stats: IntermediateExtendedStats::with_sigma(sigma), + collecting_for, + } + } +} + +impl IntermediateInnerCollector for IntermediateInnerExtendedStatsCollector { + #[inline] + fn collect(&mut self, value: f64) { + self.stats.collect(value); + } + + fn into_intermediate_metric_result(self) -> IntermediateMetricResult { + match self.collecting_for { + SegmentStatsType::ExtendedStats => IntermediateMetricResult::ExtendedStats(self.stats), + _ => { + panic!( + "cannot create IntermediateMetricResult for Stats/Min/Avg/Count/Min/Max/Sum \ + from ExtendedStats" + ); + } + } + } +} + #[derive(Clone, Debug, PartialEq)] pub(crate) enum SegmentStatsType { Average, @@ -486,32 +603,45 @@ pub(crate) enum SegmentStatsType { } #[derive(Clone, Debug, PartialEq)] -pub(crate) struct SegmentStatsCollector { +pub(crate) struct SegmentStatsCollector { missing: Option, - sigma: Option, field_type: ColumnType, - pub(crate) collecting_for: SegmentStatsType, - pub(crate) stats: IntermediateExtendedStats, + inner_intermediate_collector: T, + // pub(crate) collecting_for: SegmentStatsType, + // pub(crate) stats: IntermediateStatType, pub(crate) accessor_idx: usize, val_cache: Vec, } -impl SegmentStatsCollector { +impl SegmentStatsCollector { pub fn from_req( field_type: ColumnType, - collecting_for: SegmentStatsType, + // collecting_for: SegmentStatsType, + inner_intermediate_collector: T, accessor_idx: usize, missing: Option, - sigma: Option, ) -> Self { let missing = missing.and_then(|val| f64_to_fastfield_u64(val, &field_type)); + // let stats= match collecting_for { + // SegmentStatsType::Average | + // SegmentStatsType::Count | + // SegmentStatsType::Max | + // SegmentStatsType::Min| + // SegmentStatsType::Stats | + // SegmentStatsType::Sum => { + // IntermediateStatType::Stat(IntermediateStats::default()) + // } + // SegmentStatsType::ExtendedStats => + // IntermediateStatType::ExtendedStat(IntermediateExtendedStats::with_sigma(sigma)) + // + // }; Self { field_type, - collecting_for, - stats: IntermediateExtendedStats::with_sigma(sigma), + inner_intermediate_collector, + // collecting_for, + // stats: stats, accessor_idx, missing, - sigma, val_cache: Default::default(), } } @@ -534,12 +664,45 @@ impl SegmentStatsCollector { } for val in agg_accessor.column_block_accessor.iter_vals() { let val1 = f64_from_fastfield_u64(val, &self.field_type); - self.stats.collect(val1); + self.inner_intermediate_collector.collect(val1); } } + // fn collect(&mut self, val: f64) { + // match &mut self.stats { + // IntermediateStatType::Stat(intermediate_stats) => { + // intermediate_stats.collect(val); + // } + // IntermediateStatType::ExtendedStat(intermediate_extended_stats) => { + // intermediate_extended_stats.collect(val); + // } + // } + // } + // + // pub fn stats(self) -> IntermediateStats { + // match self.stats { + // IntermediateStatType::Stat(intermediate_stats) => { + // intermediate_stats + // } + // IntermediateStatType::ExtendedStat(intermediate_extended_stats) => { + // intermediate_extended_stats.intermediate_stats + // } + // } + // } + // pub fn extended_stats(self) -> IntermediateExtendedStats { + // match self.stats { + // IntermediateStatType::ExtendedStat(intermediate_extended_stats) => { + // intermediate_extended_stats + // } + // _ => { + // panic!("incompatible values between collecting_for and stats"); + // } + // } + // } } -impl SegmentAggregationCollector for SegmentStatsCollector { +impl SegmentAggregationCollector + for SegmentStatsCollector +{ #[inline] fn add_intermediate_aggregation_result( self: Box, @@ -548,27 +711,9 @@ impl SegmentAggregationCollector for SegmentStatsCollector { ) -> crate::Result<()> { let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string(); - let intermediate_metric_result = match self.collecting_for { - SegmentStatsType::Average => { - IntermediateMetricResult::Average(IntermediateAverage::from_collector(*self)) - } - SegmentStatsType::Count => { - IntermediateMetricResult::Count(IntermediateCount::from_collector(*self)) - } - SegmentStatsType::Max => { - IntermediateMetricResult::Max(IntermediateMax::from_collector(*self)) - } - SegmentStatsType::Min => { - IntermediateMetricResult::Min(IntermediateMin::from_collector(*self)) - } - SegmentStatsType::Stats => { - IntermediateMetricResult::Stats(IntermediateStats::from_collector(*self)) - } - SegmentStatsType::ExtendedStats => IntermediateMetricResult::ExtendedStats(self.stats), - SegmentStatsType::Sum => { - IntermediateMetricResult::Sum(IntermediateSum::from_collector(*self)) - } - }; + let intermediate_metric_result = self + .inner_intermediate_collector + .into_intermediate_metric_result(); results.push( name, IntermediateAggregationResult::Metric(intermediate_metric_result), @@ -588,17 +733,17 @@ impl SegmentAggregationCollector for SegmentStatsCollector { let mut has_val = false; for val in field.values_for_doc(doc) { let val1 = f64_from_fastfield_u64(val, &self.field_type); - self.stats.collect(val1); + self.inner_intermediate_collector.collect(val1); has_val = true; } if !has_val { - self.stats + self.inner_intermediate_collector .collect(f64_from_fastfield_u64(missing, &self.field_type)); } } else { for val in field.values_for_doc(doc) { let val1 = f64_from_fastfield_u64(val, &self.field_type); - self.stats.collect(val1); + self.inner_intermediate_collector.collect(val1); } } diff --git a/src/aggregation/metric/sum.rs b/src/aggregation/metric/sum.rs index 1babe6bae..a455ae010 100644 --- a/src/aggregation/metric/sum.rs +++ b/src/aggregation/metric/sum.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateExtendedStats, SegmentStatsCollector}; +use super::IntermediateStats; /// A single-value metric aggregation that sums up numeric values that are /// extracted from the aggregated documents. @@ -46,15 +46,13 @@ impl SumAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateSum { - stats: IntermediateExtendedStats, + stats: IntermediateStats, } impl IntermediateSum { - /// Creates a new [`IntermediateSum`] instance from a [`SegmentStatsCollector`]. - pub(crate) fn from_collector(collector: SegmentStatsCollector) -> Self { - Self { - stats: collector.stats, - } + /// Creates a new [`IntermediateAverage`] instance from a [`IntermediateStats`]. + pub(crate) fn from_stats(stats: IntermediateStats) -> Self { + Self { stats } } /// Merges the other intermediate result into self. pub fn merge_fruits(&mut self, other: IntermediateSum) { diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 0c852585a..071a49706 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -16,6 +16,9 @@ use super::metric::{ SumAggregation, }; use crate::aggregation::bucket::TermMissingAgg; +use crate::aggregation::metric::{ + IntermediateInnerExtendedStatsCollector, IntermediateInnerStatsCollector, +}; pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug { fn add_intermediate_aggregation_result( @@ -118,55 +121,51 @@ pub(crate) fn build_single_agg_segment_collector( Average(AverageAggregation { missing, .. }) => { Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, - SegmentStatsType::Average, + IntermediateInnerStatsCollector::from(SegmentStatsType::Average), accessor_idx, *missing, - None, ))) } Count(CountAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, - SegmentStatsType::Count, + IntermediateInnerStatsCollector::from(SegmentStatsType::Count), accessor_idx, *missing, - None, ))), Max(MaxAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, - SegmentStatsType::Max, + IntermediateInnerStatsCollector::from(SegmentStatsType::Max), accessor_idx, *missing, - None, ))), Min(MinAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, - SegmentStatsType::Min, + IntermediateInnerStatsCollector::from(SegmentStatsType::Min), accessor_idx, *missing, - None, ))), Stats(StatsAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, - SegmentStatsType::Stats, + IntermediateInnerStatsCollector::from(SegmentStatsType::Stats), accessor_idx, *missing, - None, ))), ExtendedStats(ExtendedStatsAggregation { missing, sigma, .. }) => { Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, - SegmentStatsType::ExtendedStats, + IntermediateInnerExtendedStatsCollector::from( + SegmentStatsType::ExtendedStats, + *sigma, + ), accessor_idx, *missing, - *sigma, ))) } Sum(SumAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, - SegmentStatsType::Sum, + IntermediateInnerStatsCollector::from(SegmentStatsType::Sum), accessor_idx, *missing, - None, ))), Percentiles(percentiles_req) => Ok(Box::new( SegmentPercentilesCollector::from_req_and_validate(