From 86bdb8b95c6ed9002619cbfb2eac17b3a313e1c9 Mon Sep 17 00:00:00 2001 From: Giovanni Cuccu Date: Sat, 4 Nov 2023 12:34:04 +0100 Subject: [PATCH] using IntermediateExtendStats instead of IntermediateStats with all tests passing --- src/aggregation/metric/average.rs | 4 +-- src/aggregation/metric/count.rs | 4 +-- src/aggregation/metric/max.rs | 4 +-- src/aggregation/metric/min.rs | 4 +-- src/aggregation/metric/stats.rs | 60 +++++++++++++++++++++++++++---- src/aggregation/metric/sum.rs | 4 +-- 6 files changed, 63 insertions(+), 17 deletions(-) diff --git a/src/aggregation/metric/average.rs b/src/aggregation/metric/average.rs index 966ea1005..a4047adef 100644 --- a/src/aggregation/metric/average.rs +++ b/src/aggregation/metric/average.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateStats, SegmentStatsCollector}; +use super::{IntermediateExtendedStats, SegmentStatsCollector}; /// A single-value metric aggregation that computes the average of numeric values that are /// extracted from the aggregated documents. @@ -46,7 +46,7 @@ impl AverageAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateAverage { - stats: IntermediateStats, + stats: IntermediateExtendedStats, } impl IntermediateAverage { diff --git a/src/aggregation/metric/count.rs b/src/aggregation/metric/count.rs index bb298de8b..d645ce3f4 100644 --- a/src/aggregation/metric/count.rs +++ b/src/aggregation/metric/count.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateStats, SegmentStatsCollector}; +use super::{IntermediateExtendedStats, SegmentStatsCollector}; /// A single-value metric aggregation that counts the number of values that are /// extracted from the aggregated documents. @@ -46,7 +46,7 @@ impl CountAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateCount { - stats: IntermediateStats, + stats: IntermediateExtendedStats, } impl IntermediateCount { diff --git a/src/aggregation/metric/max.rs b/src/aggregation/metric/max.rs index 268e02da2..fe66fc0cb 100644 --- a/src/aggregation/metric/max.rs +++ b/src/aggregation/metric/max.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateStats, SegmentStatsCollector}; +use super::{IntermediateExtendedStats, SegmentStatsCollector}; /// A single-value metric aggregation that computes the maximum of numeric values that are /// extracted from the aggregated documents. @@ -46,7 +46,7 @@ impl MaxAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateMax { - stats: IntermediateStats, + stats: IntermediateExtendedStats, } impl IntermediateMax { diff --git a/src/aggregation/metric/min.rs b/src/aggregation/metric/min.rs index a1c4066df..da717b241 100644 --- a/src/aggregation/metric/min.rs +++ b/src/aggregation/metric/min.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateStats, SegmentStatsCollector}; +use super::{IntermediateExtendedStats, SegmentStatsCollector}; /// A single-value metric aggregation that computes the minimum of numeric values that are /// extracted from the aggregated documents. @@ -46,7 +46,7 @@ impl MinAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateMin { - stats: IntermediateStats, + stats: IntermediateExtendedStats, } impl IntermediateMin { diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index 6afea5eb8..e1bd383b4 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -81,8 +81,8 @@ impl Stats { } } -/// Intermediate result of the stats aggregation that can be combined with other intermediate -/// results. + +/* #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateStats { /// The number of extracted values. @@ -149,6 +149,41 @@ impl IntermediateStats { self.max = self.max.max(value); } } +*/ +/// Intermediate result of the stats aggregation that can be combined with other intermediate +/// results. +#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct IntermediateStats { + stats: IntermediateExtendedStats, +} + +impl IntermediateStats { + + pub(crate) fn from_collector(collector: SegmentStatsCollector) -> Self { + Self { + stats: collector.stats, + } + } + + /// Merges the other stats intermediate result into self. + pub fn merge_fruits(&mut self, other: IntermediateStats) { + self.stats.merge_fruits(other.stats); + } + + /// Computes the final stats value. + pub fn finalize(&self) -> Stats { + let extended_stats=self.stats.finalize(); + Stats { + count: extended_stats.count, + sum: extended_stats.sum, + min: extended_stats.min, + max: extended_stats.max, + avg: extended_stats.avg, + } + } + + +} #[derive(Clone, Debug, PartialEq)] pub(crate) enum SegmentStatsType { @@ -165,7 +200,7 @@ pub(crate) struct SegmentStatsCollector { missing: Option, field_type: ColumnType, pub(crate) collecting_for: SegmentStatsType, - pub(crate) stats: IntermediateStats, + pub(crate) stats: IntermediateExtendedStats, pub(crate) accessor_idx: usize, val_cache: Vec, } @@ -181,7 +216,7 @@ impl SegmentStatsCollector { Self { field_type, collecting_for, - stats: IntermediateStats::default(), + stats: IntermediateExtendedStats::default(), accessor_idx, missing, val_cache: Default::default(), @@ -233,7 +268,9 @@ impl SegmentAggregationCollector for SegmentStatsCollector { SegmentStatsType::Min => { IntermediateMetricResult::Min(IntermediateMin::from_collector(*self)) } - SegmentStatsType::Stats => IntermediateMetricResult::Stats(self.stats), + SegmentStatsType::Stats => { + IntermediateMetricResult::Stats(IntermediateStats::from_collector(*self)) + } SegmentStatsType::Sum => { IntermediateMetricResult::Sum(IntermediateSum::from_collector(*self)) } @@ -287,7 +324,9 @@ impl SegmentAggregationCollector for SegmentStatsCollector { } } - +/// Extended stats contains a collection of statistics +/// they extends stats adding variance, standard deviation +/// and bound informations pub struct ExtendedStats { /// The number of documents. pub count: u64, @@ -315,6 +354,8 @@ pub struct ExtendedStats { pub standard_deviation_sampling: Option, } +/// Intermediate result of the extended stats aggregation that can be combined with other intermediate +/// results. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateExtendedStats { /// The number of extracted values. @@ -435,7 +476,12 @@ impl IntermediateExtendedStats { fn update_variance(&mut self, value: f64) { let delta = value - self.mean; - self.mean += delta / self.count as f64; + //this is not what the Welford's online algorithm prescribes but + //using the pseudo code from wikipedia there was a small rounding + //error (in 15th decimal place) that caused a test + //(test_aggregation_level1 in agg_test.rs) + //failure + self.mean = self.sum / self.count as f64; let delta2 = value - self.mean; self.sum_of_squares += delta * delta2; } diff --git a/src/aggregation/metric/sum.rs b/src/aggregation/metric/sum.rs index 067350a17..1babe6bae 100644 --- a/src/aggregation/metric/sum.rs +++ b/src/aggregation/metric/sum.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use super::{IntermediateStats, SegmentStatsCollector}; +use super::{IntermediateExtendedStats, SegmentStatsCollector}; /// A single-value metric aggregation that sums up numeric values that are /// extracted from the aggregated documents. @@ -46,7 +46,7 @@ impl SumAggregation { /// results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateSum { - stats: IntermediateStats, + stats: IntermediateExtendedStats, } impl IntermediateSum {