From 78273bfb0d33cd0a2c1f153e02646a1355702e0f Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 13 Jan 2023 17:43:25 +0800 Subject: [PATCH] reuse stats for average --- src/aggregation/intermediate_agg_result.rs | 18 +++--- src/aggregation/metric/average.rs | 64 +++------------------- src/aggregation/metric/stats.rs | 27 ++++++--- src/aggregation/segment_agg_result.rs | 12 ++-- 4 files changed, 41 insertions(+), 80 deletions(-) diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 508dc7998..38bda09ca 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -204,21 +204,23 @@ pub enum IntermediateAggregationResult { /// Holds the intermediate data for metric results #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum IntermediateMetricResult { - /// Average containing intermediate average data result + /// Intermediate average result Average(IntermediateAverage), - /// AverageData variant + /// Intermediate stats result Stats(IntermediateStats), } impl From for IntermediateMetricResult { fn from(tree: SegmentMetricResultCollector) -> Self { match tree { - SegmentMetricResultCollector::Average(collector) => { - IntermediateMetricResult::Average(IntermediateAverage::from_collector(collector)) - } - SegmentMetricResultCollector::Stats(collector) => { - IntermediateMetricResult::Stats(collector.stats) - } + SegmentMetricResultCollector::Stats(collector) => match collector.collecting_for { + super::metric::SegmentStatsType::Stats => { + IntermediateMetricResult::Stats(collector.stats) + } + super::metric::SegmentStatsType::Avg => IntermediateMetricResult::Average( + IntermediateAverage::from_collector(collector), + ), + }, } } } diff --git a/src/aggregation/metric/average.rs b/src/aggregation/metric/average.rs index 2f22430b4..53160442e 100644 --- a/src/aggregation/metric/average.rs +++ b/src/aggregation/metric/average.rs @@ -1,11 +1,8 @@ use std::fmt::Debug; -use fastfield_codecs::Column; use serde::{Deserialize, Serialize}; -use crate::aggregation::f64_from_fastfield_u64; -use crate::schema::Type; -use crate::DocId; +use super::SegmentStatsCollector; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] /// A single-value metric aggregation that computes the average of numeric values that are @@ -36,61 +33,19 @@ impl AverageAggregation { } } -#[derive(Clone, PartialEq)] -pub(crate) struct SegmentAverageCollector { - pub data: IntermediateAverage, - field_type: Type, -} - -impl Debug for SegmentAverageCollector { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AverageCollector") - .field("data", &self.data) - .finish() - } -} - -impl SegmentAverageCollector { - pub fn from_req(field_type: Type) -> Self { - Self { - field_type, - data: Default::default(), - } - } - pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column) { - let mut iter = doc.chunks_exact(4); - for docs in iter.by_ref() { - let val1 = field.get_val(docs[0]); - let val2 = field.get_val(docs[1]); - let val3 = field.get_val(docs[2]); - let val4 = field.get_val(docs[3]); - let val1 = f64_from_fastfield_u64(val1, &self.field_type); - let val2 = f64_from_fastfield_u64(val2, &self.field_type); - let val3 = f64_from_fastfield_u64(val3, &self.field_type); - let val4 = f64_from_fastfield_u64(val4, &self.field_type); - self.data.collect(val1); - self.data.collect(val2); - self.data.collect(val3); - self.data.collect(val4); - } - for &doc in iter.remainder() { - let val = field.get_val(doc); - let val = f64_from_fastfield_u64(val, &self.field_type); - self.data.collect(val); - } - } -} - /// Contains mergeable version of average data. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateAverage { pub(crate) sum: f64, - pub(crate) doc_count: u64, + pub(crate) doc_count: u32, } impl IntermediateAverage { - pub(crate) fn from_collector(collector: SegmentAverageCollector) -> Self { - collector.data + pub(crate) fn from_collector(collector: SegmentStatsCollector) -> Self { + Self { + sum: collector.stats.sum, + doc_count: collector.stats.count, + } } /// Merge average data into this instance. @@ -106,9 +61,4 @@ impl IntermediateAverage { Some(self.sum / self.doc_count as f64) } } - #[inline] - fn collect(&mut self, val: f64) { - self.doc_count += 1; - self.sum += val; - } } diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index dec50bdf0..898a9edd0 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -40,7 +40,7 @@ impl StatsAggregation { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Stats { /// The number of documents. - pub count: usize, + pub count: u32, /// The sum of the fast field values. pub sum: f64, /// The standard deviation of the fast field values. `None` for count == 0. @@ -73,11 +73,16 @@ impl Stats { /// `IntermediateStats` contains the mergeable version for stats. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateStats { - count: usize, - sum: f64, - squared_sum: f64, - min: f64, - max: f64, + /// the number of values + pub count: u32, + /// the sum of the values + pub sum: f64, + /// the squared sum of the values + pub squared_sum: f64, + /// the min value of the values + pub min: f64, + /// the max value of the values + pub max: f64, } impl Default for IntermediateStats { fn default() -> Self { @@ -150,17 +155,25 @@ impl IntermediateStats { } } +#[derive(Clone, Debug, PartialEq)] +pub(crate) enum SegmentStatsType { + Stats, + Avg, +} + #[derive(Clone, Debug, PartialEq)] pub(crate) struct SegmentStatsCollector { pub(crate) stats: IntermediateStats, field_type: Type, + pub(crate) collecting_for: SegmentStatsType, } impl SegmentStatsCollector { - pub fn from_req(field_type: Type) -> Self { + pub fn from_req(field_type: Type, collecting_for: SegmentStatsType) -> Self { Self { field_type, stats: IntermediateStats::default(), + collecting_for, } } pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column) { diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 28944c39b..cc2469270 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -15,7 +15,7 @@ use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTer use super::collector::MAX_BUCKET_COUNT; use super::intermediate_agg_result::{IntermediateAggregationResults, IntermediateBucketResult}; use super::metric::{ - AverageAggregation, SegmentAverageCollector, SegmentStatsCollector, StatsAggregation, + AverageAggregation, SegmentStatsCollector, SegmentStatsType, StatsAggregation, }; use super::VecWithNames; use crate::aggregation::agg_req::BucketAggregationType; @@ -163,7 +163,6 @@ impl SegmentAggregationResultsCollector { #[derive(Clone, Debug, PartialEq)] pub(crate) enum SegmentMetricResultCollector { - Average(SegmentAverageCollector), Stats(SegmentStatsCollector), } @@ -171,22 +170,19 @@ impl SegmentMetricResultCollector { pub fn from_req_and_validate(req: &MetricAggregationWithAccessor) -> crate::Result { match &req.metric { MetricAggregation::Average(AverageAggregation { field: _ }) => { - Ok(SegmentMetricResultCollector::Average( - SegmentAverageCollector::from_req(req.field_type), + Ok(SegmentMetricResultCollector::Stats( + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Avg), )) } MetricAggregation::Stats(StatsAggregation { field: _ }) => { Ok(SegmentMetricResultCollector::Stats( - SegmentStatsCollector::from_req(req.field_type), + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats), )) } } } pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) { match self { - SegmentMetricResultCollector::Average(avg_collector) => { - avg_collector.collect_block(doc, &*metric.accessor); - } SegmentMetricResultCollector::Stats(stats_collector) => { stats_collector.collect_block(doc, &*metric.accessor); }