diff --git a/Cargo.toml b/Cargo.toml index fc2a1a115..0174ad01a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ query-grammar = { version= "0.19.0", path="./query-grammar", package = "tantivy- tantivy-bitpacker = { version= "0.3", path="./bitpacker" } common = { version= "0.5", path = "./common/", package = "tantivy-common" } tokenizer-api = { version="0.1", path="./tokenizer-api", package="tantivy-tokenizer-api" } +sketches-ddsketch = { git = "https://github.com/PSeitz/rust-sketches-ddsketch", version = "0.2.0", features = ["use_serde"] } [target.'cfg(windows)'.dependencies] winapi = "0.3.9" @@ -78,6 +79,8 @@ env_logger = "0.10.0" pprof = { version = "0.11.0", features = ["flamegraph", "criterion"] } futures = "0.3.21" paste = "1.0.11" +more-asserts = "0.3.1" +rand_distr = "0.4.3" [dev-dependencies.fail] version = "0.5.0" diff --git a/examples/aggregation.rs b/examples/aggregation.rs index 7dc7d6754..ab8b118bd 100644 --- a/examples/aggregation.rs +++ b/examples/aggregation.rs @@ -9,10 +9,9 @@ use serde_json::{Deserializer, Value}; use tantivy::aggregation::agg_req::{ Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation, - RangeAggregation, }; use tantivy::aggregation::agg_result::AggregationResults; -use tantivy::aggregation::bucket::RangeAggregationRange; +use tantivy::aggregation::bucket::{RangeAggregation, RangeAggregationRange}; use tantivy::aggregation::metric::AverageAggregation; use tantivy::aggregation::AggregationCollector; use tantivy::query::AllQuery; diff --git a/src/aggregation/agg_bench.rs b/src/aggregation/agg_bench.rs index 24f52f4a6..ba3208427 100644 --- a/src/aggregation/agg_bench.rs +++ b/src/aggregation/agg_bench.rs @@ -3,17 +3,27 @@ mod bench { use columnar::Cardinality; use rand::prelude::SliceRandom; - use rand::{thread_rng, Rng}; + use rand::rngs::StdRng; + use rand::{Rng, SeedableRng}; + use rand_distr::Distribution; use test::{self, Bencher}; - use super::*; - use crate::aggregation::bucket::{ - CustomOrder, HistogramAggregation, HistogramBounds, Order, OrderTarget, TermsAggregation, + use crate::aggregation::agg_req::{ + Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation, }; - use crate::aggregation::metric::StatsAggregation; - use crate::query::AllQuery; - use crate::schema::{Schema, TextFieldIndexing, FAST, STRING}; - use crate::Index; + use crate::aggregation::bucket::{ + CustomOrder, HistogramAggregation, HistogramBounds, Order, OrderTarget, RangeAggregation, + TermsAggregation, + }; + use crate::aggregation::metric::{AverageAggregation, StatsAggregation}; + use crate::aggregation::AggregationCollector; + use crate::query::{AllQuery, TermQuery}; + use crate::schema::{IndexRecordOption, Schema, TextFieldIndexing, FAST, STRING}; + use crate::{Index, Term}; + + fn get_collector(agg_req: Aggregations) -> AggregationCollector { + AggregationCollector::from_aggs(agg_req, Default::default()) + } fn get_test_index_bench(cardinality: Cardinality) -> crate::Result { let mut schema_builder = Schema::builder(); @@ -31,11 +41,14 @@ mod bench { let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype); let index = Index::create_from_tempdir(schema_builder.build())?; let few_terms_data = vec!["INFO", "ERROR", "WARN", "DEBUG"]; + + let lg_norm = rand_distr::LogNormal::new(2.996f64, 0.979f64).unwrap(); + let many_terms_data = (0..150_000) .map(|num| format!("author{}", num)) .collect::>(); { - let mut rng = thread_rng(); + let mut rng = StdRng::from_seed([1u8; 32]); let mut index_writer = index.writer_with_num_threads(1, 100_000_000)?; // To make the different test cases comparable we just change one doc to force the // cardinality @@ -52,8 +65,8 @@ mod bench { text_field_few_terms => "cool", score_field => 1u64, score_field => 1u64, - score_field_f64 => 1.0, - score_field_f64 => 1.0, + score_field_f64 => lg_norm.sample(&mut rng), + score_field_f64 => lg_norm.sample(&mut rng), score_field_i64 => 1i64, score_field_i64 => 1i64, ))?; @@ -65,7 +78,7 @@ mod bench { text_field_many_terms => many_terms_data.choose(&mut rng).unwrap().to_string(), text_field_few_terms => few_terms_data.choose(&mut rng).unwrap().to_string(), score_field => val as u64, - score_field_f64 => val, + score_field_f64 => lg_norm.sample(&mut rng), score_field_i64 => val as i64, ))?; } @@ -186,6 +199,31 @@ mod bench { }); } + bench_all_cardinalities!(bench_aggregation_percentiles_f64); + + fn bench_aggregation_percentiles_f64_card(b: &mut Bencher, cardinality: Cardinality) { + let index = get_test_index_bench(cardinality).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req_str = r#" + { + "mypercentiles": { + "percentiles": { + "field": "score_f64", + "percents": [ 95, 99, 99.9 ] + } + } + } "#; + let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap(); + + let collector = get_collector(agg_req_1); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + bench_all_cardinalities!(bench_aggregation_average_u64_and_f64); fn bench_aggregation_average_u64_and_f64_card(b: &mut Bencher, cardinality: Cardinality) { diff --git a/src/aggregation/agg_req.rs b/src/aggregation/agg_req.rs index 0fe1d4f03..9039cded1 100644 --- a/src/aggregation/agg_req.rs +++ b/src/aggregation/agg_req.rs @@ -49,11 +49,12 @@ use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; -pub use super::bucket::RangeAggregation; -use super::bucket::{DateHistogramAggregationReq, HistogramAggregation, TermsAggregation}; +use super::bucket::{ + DateHistogramAggregationReq, HistogramAggregation, RangeAggregation, TermsAggregation, +}; use super::metric::{ - AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, StatsAggregation, - SumAggregation, + AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, + PercentilesAggregationReq, StatsAggregation, SumAggregation, }; use super::VecWithNames; @@ -246,9 +247,19 @@ pub enum MetricAggregation { /// Computes the sum of the extracted values. #[serde(rename = "sum")] Sum(SumAggregation), + /// Computes the sum of the extracted values. + #[serde(rename = "percentiles")] + Percentiles(PercentilesAggregationReq), } impl MetricAggregation { + pub(crate) fn as_percentile(&self) -> Option<&PercentilesAggregationReq> { + match &self { + MetricAggregation::Percentiles(percentile_req) => Some(percentile_req), + _ => None, + } + } + fn get_fast_field_name(&self) -> &str { match self { MetricAggregation::Average(avg) => avg.field_name(), @@ -257,6 +268,7 @@ impl MetricAggregation { MetricAggregation::Min(min) => min.field_name(), MetricAggregation::Stats(stats) => stats.field_name(), MetricAggregation::Sum(sum) => sum.field_name(), + MetricAggregation::Percentiles(per) => per.field_name(), } } } diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index a82842ea9..407f2b078 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -137,6 +137,20 @@ impl MetricAggregationWithAccessor { Some(get_numeric_or_date_column_types()), )?; + Ok(MetricAggregationWithAccessor { + accessor, + field_type, + metric: metric.clone(), + column_block_accessor: Default::default(), + }) + } + MetricAggregation::Percentiles(percentiles) => { + let (accessor, field_type) = get_ff_reader_and_validate( + reader, + percentiles.field_name(), + Some(get_numeric_or_date_column_types()), + )?; + Ok(MetricAggregationWithAccessor { accessor, field_type, diff --git a/src/aggregation/agg_result.rs b/src/aggregation/agg_result.rs index d47a7d07b..09068f51d 100644 --- a/src/aggregation/agg_result.rs +++ b/src/aggregation/agg_result.rs @@ -9,10 +9,10 @@ use serde::{Deserialize, Serialize}; use super::agg_req::BucketAggregationInternal; use super::bucket::GetDocCount; -use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult}; -use super::metric::{SingleMetricResult, Stats}; +use super::intermediate_agg_result::IntermediateBucketResult; +use super::metric::{PercentilesMetricResult, SingleMetricResult, Stats}; use super::segment_agg_result::AggregationLimits; -use super::Key; +use super::{AggregationError, Key}; use crate::TantivyError; #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] @@ -94,6 +94,8 @@ pub enum MetricResult { Stats(Stats), /// Sum metric result. Sum(SingleMetricResult), + /// Sum metric result. + Percentiles(PercentilesMetricResult), } impl MetricResult { @@ -105,30 +107,9 @@ impl MetricResult { MetricResult::Min(min) => Ok(min.value), MetricResult::Stats(stats) => stats.get_value(agg_property), MetricResult::Sum(sum) => Ok(sum.value), - } - } -} -impl From for MetricResult { - fn from(metric: IntermediateMetricResult) -> Self { - match metric { - IntermediateMetricResult::Average(intermediate_avg) => { - MetricResult::Average(intermediate_avg.finalize().into()) - } - IntermediateMetricResult::Count(intermediate_count) => { - MetricResult::Count(intermediate_count.finalize().into()) - } - IntermediateMetricResult::Max(intermediate_max) => { - MetricResult::Max(intermediate_max.finalize().into()) - } - IntermediateMetricResult::Min(intermediate_min) => { - MetricResult::Min(intermediate_min.finalize().into()) - } - IntermediateMetricResult::Stats(intermediate_stats) => { - MetricResult::Stats(intermediate_stats.finalize()) - } - IntermediateMetricResult::Sum(intermediate_sum) => { - MetricResult::Sum(intermediate_sum.finalize().into()) - } + MetricResult::Percentiles(_) => Err(TantivyError::AggregationError( + AggregationError::InvalidRequest("percentiles can't be used to order".to_string()), + )), } } } diff --git a/src/aggregation/agg_tests.rs b/src/aggregation/agg_tests.rs index 44f40f0cc..e430fb850 100644 --- a/src/aggregation/agg_tests.rs +++ b/src/aggregation/agg_tests.rs @@ -111,7 +111,7 @@ fn test_aggregation_flushing( let searcher = reader.searcher(); let intermediate_agg_result = searcher.search(&AllQuery, &collector).unwrap(); intermediate_agg_result - .into_final_bucket_result(agg_req, &Default::default()) + .into_final_result(agg_req, &Default::default()) .unwrap() } else { let collector = get_collector(agg_req); @@ -448,7 +448,7 @@ fn test_aggregation_level2( // Test de/serialization roundtrip on intermediate_agg_result let res: IntermediateAggregationResults = serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap(); - res.into_final_bucket_result(agg_req.clone(), &Default::default()) + res.into_final_result(agg_req.clone(), &Default::default()) .unwrap() } else { let collector = get_collector(agg_req.clone()); diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index 5840db6f2..dc80a7e53 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -104,7 +104,7 @@ impl Collector for AggregationCollector { segment_fruits: Vec<::Fruit>, ) -> crate::Result { let res = merge_fruits(segment_fruits)?; - res.into_final_bucket_result(self.agg.clone(), &self.limits) + res.into_final_result(self.agg.clone(), &self.limits) } } @@ -114,7 +114,7 @@ fn merge_fruits( if let Some(fruit) = segment_fruits.pop() { let mut fruit = fruit?; for next_fruit in segment_fruits { - fruit.merge_fruits(next_fruit?); + fruit.merge_fruits(next_fruit?)?; } Ok(fruit) } else { diff --git a/src/aggregation/error.rs b/src/aggregation/error.rs index a2b864c33..b599d7257 100644 --- a/src/aggregation/error.rs +++ b/src/aggregation/error.rs @@ -5,6 +5,12 @@ use super::bucket::DateHistogramParseError; /// Error that may occur when opening a directory #[derive(Debug, Clone, PartialEq, Eq, Error)] pub enum AggregationError { + /// InternalError Aggregation Request + #[error("InternalError: {0:?}")] + InternalError(String), + /// Invalid Aggregation Request + #[error("InvalidRequest: {0:?}")] + InvalidRequest(String), /// Date histogram parse error #[error("Date histogram parse error: {0:?}")] DateHistogramParseError(#[from] DateHistogramParseError), diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 59a22a56d..35f3c8c47 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -12,16 +12,17 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use super::agg_req::{ Aggregations, AggregationsInternal, BucketAggregationInternal, BucketAggregationType, - MetricAggregation, RangeAggregation, + MetricAggregation, }; -use super::agg_result::{AggregationResult, BucketResult, RangeBucketEntry}; +use super::agg_result::{AggregationResult, BucketResult, MetricResult, RangeBucketEntry}; use super::bucket::{ cut_off_buckets, get_agg_name_and_property, intermediate_histogram_buckets_to_final_buckets, - GetDocCount, Order, OrderTarget, SegmentHistogramBucketEntry, TermsAggregation, + GetDocCount, Order, OrderTarget, RangeAggregation, SegmentHistogramBucketEntry, + TermsAggregation, }; use super::metric::{ IntermediateAverage, IntermediateCount, IntermediateMax, IntermediateMin, IntermediateStats, - IntermediateSum, + IntermediateSum, PercentilesCollector, }; use super::segment_agg_result::AggregationLimits; use super::{format_date, AggregationError, Key, SerializedKey, VecWithNames}; @@ -41,12 +42,12 @@ pub struct IntermediateAggregationResults { impl IntermediateAggregationResults { /// Convert intermediate result and its aggregation request to the final result. - pub fn into_final_bucket_result( + pub fn into_final_result( self, req: Aggregations, limits: &AggregationLimits, ) -> crate::Result { - let res = self.into_final_bucket_result_internal(&(req.into()), limits)?; + let res = self.into_final_result_internal(&(req.into()), limits)?; let bucket_count = res.get_bucket_count() as u32; if bucket_count > limits.get_bucket_limit() { return Err(TantivyError::AggregationError( @@ -63,7 +64,7 @@ impl IntermediateAggregationResults { /// /// Internal function, AggregationsInternal is used instead Aggregations, which is optimized /// for internal processing, by splitting metric and buckets into separate groups. - pub(crate) fn into_final_bucket_result_internal( + pub(crate) fn into_final_result_internal( self, req: &AggregationsInternal, limits: &AggregationLimits, @@ -82,7 +83,7 @@ impl IntermediateAggregationResults { }; if let Some(metrics) = self.metrics { - convert_and_add_final_metrics_to_result(&mut results, metrics); + convert_and_add_final_metrics_to_result(&mut results, metrics, &req.metrics); } else { // When there are no metrics, we create empty metric results, so that the serialized // json format is constant @@ -132,12 +133,12 @@ impl IntermediateAggregationResults { /// /// The order of the values need to be the same on both results. This is ensured when the same /// (key values) are present on the underlying `VecWithNames` struct. - pub fn merge_fruits(&mut self, other: IntermediateAggregationResults) { + pub fn merge_fruits(&mut self, other: IntermediateAggregationResults) -> crate::Result<()> { if let (Some(buckets_left), Some(buckets_right)) = (&mut self.buckets, other.buckets) { for (bucket_left, bucket_right) in buckets_left.values_mut().zip(buckets_right.into_values()) { - bucket_left.merge_fruits(bucket_right); + bucket_left.merge_fruits(bucket_right)?; } } @@ -145,20 +146,28 @@ impl IntermediateAggregationResults { for (metric_left, metric_right) in metrics_left.values_mut().zip(metrics_right.into_values()) { - metric_left.merge_fruits(metric_right); + metric_left.merge_fruits(metric_right)?; } } + Ok(()) } } fn convert_and_add_final_metrics_to_result( results: &mut FxHashMap, metrics: VecWithNames, + metrics_req: &VecWithNames, ) { + let metric_result_with_request = metrics.into_iter().zip(metrics_req.values()); results.extend( - metrics + metric_result_with_request .into_iter() - .map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))), + .map(|((key, metric), req)| { + ( + key, + AggregationResult::MetricResult(metric.into_final_metric_result(req)), + ) + }), ); } @@ -170,7 +179,7 @@ fn add_empty_final_metrics_to_result( let empty_bucket = IntermediateMetricResult::empty_from_req(req); ( key.to_string(), - AggregationResult::MetricResult(empty_bucket.into()), + AggregationResult::MetricResult(empty_bucket.into_final_metric_result(req)), ) })); Ok(()) @@ -218,6 +227,8 @@ pub enum IntermediateAggregationResult { /// Holds the intermediate data for metric results #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum IntermediateMetricResult { + /// Intermediate average result. + Percentiles(PercentilesCollector), /// Intermediate average result. Average(IntermediateAverage), /// Intermediate count result. @@ -233,6 +244,32 @@ pub enum IntermediateMetricResult { } impl IntermediateMetricResult { + fn into_final_metric_result(self, req: &MetricAggregation) -> MetricResult { + match self { + IntermediateMetricResult::Average(intermediate_avg) => { + MetricResult::Average(intermediate_avg.finalize().into()) + } + IntermediateMetricResult::Count(intermediate_count) => { + MetricResult::Count(intermediate_count.finalize().into()) + } + IntermediateMetricResult::Max(intermediate_max) => { + MetricResult::Max(intermediate_max.finalize().into()) + } + IntermediateMetricResult::Min(intermediate_min) => { + MetricResult::Min(intermediate_min.finalize().into()) + } + IntermediateMetricResult::Stats(intermediate_stats) => { + MetricResult::Stats(intermediate_stats.finalize()) + } + IntermediateMetricResult::Sum(intermediate_sum) => { + MetricResult::Sum(intermediate_sum.finalize().into()) + } + IntermediateMetricResult::Percentiles(percentiles) => MetricResult::Percentiles( + percentiles.into_final_result(req.as_percentile().expect("unexpected metric type")), + ), + } + } + pub(crate) fn empty_from_req(req: &MetricAggregation) -> Self { match req { MetricAggregation::Average(_) => { @@ -247,9 +284,12 @@ impl IntermediateMetricResult { IntermediateMetricResult::Stats(IntermediateStats::default()) } MetricAggregation::Sum(_) => IntermediateMetricResult::Sum(IntermediateSum::default()), + MetricAggregation::Percentiles(_) => { + IntermediateMetricResult::Percentiles(PercentilesCollector::default()) + } } } - fn merge_fruits(&mut self, other: IntermediateMetricResult) { + fn merge_fruits(&mut self, other: IntermediateMetricResult) -> crate::Result<()> { match (self, other) { ( IntermediateMetricResult::Average(avg_left), @@ -278,10 +318,18 @@ impl IntermediateMetricResult { (IntermediateMetricResult::Sum(sum_left), IntermediateMetricResult::Sum(sum_right)) => { sum_left.merge_fruits(sum_right); } + ( + IntermediateMetricResult::Percentiles(left), + IntermediateMetricResult::Percentiles(right), + ) => { + left.merge_fruits(right)?; + } _ => { - panic!("incompatible fruit types in tree"); + panic!("incompatible fruit types in tree or missing merge_fruits handler"); } } + + Ok(()) } } @@ -396,13 +444,13 @@ impl IntermediateBucketResult { } } } - fn merge_fruits(&mut self, other: IntermediateBucketResult) { + fn merge_fruits(&mut self, other: IntermediateBucketResult) -> crate::Result<()> { match (self, other) { ( IntermediateBucketResult::Terms(term_res_left), IntermediateBucketResult::Terms(term_res_right), ) => { - merge_key_maps(&mut term_res_left.entries, term_res_right.entries); + merge_key_maps(&mut term_res_left.entries, term_res_right.entries)?; term_res_left.sum_other_doc_count += term_res_right.sum_other_doc_count; term_res_left.doc_count_error_upper_bound += term_res_right.doc_count_error_upper_bound; @@ -412,7 +460,7 @@ impl IntermediateBucketResult { IntermediateBucketResult::Range(range_res_left), IntermediateBucketResult::Range(range_res_right), ) => { - merge_serialized_key_maps(&mut range_res_left.buckets, range_res_right.buckets); + merge_serialized_key_maps(&mut range_res_left.buckets, range_res_right.buckets)?; } ( IntermediateBucketResult::Histogram { @@ -424,22 +472,23 @@ impl IntermediateBucketResult { .. }, ) => { - let buckets = buckets_left - .drain(..) - .merge_join_by(buckets_right.into_iter(), |left, right| { - left.key.partial_cmp(&right.key).unwrap_or(Ordering::Equal) - }) - .map(|either| match either { - itertools::EitherOrBoth::Both(mut left, right) => { - left.merge_fruits(right); - left - } - itertools::EitherOrBoth::Left(left) => left, - itertools::EitherOrBoth::Right(right) => right, - }) - .collect(); + let buckets: Result, TantivyError> = + buckets_left + .drain(..) + .merge_join_by(buckets_right.into_iter(), |left, right| { + left.key.partial_cmp(&right.key).unwrap_or(Ordering::Equal) + }) + .map(|either| match either { + itertools::EitherOrBoth::Both(mut left, right) => { + left.merge_fruits(right)?; + Ok(left) + } + itertools::EitherOrBoth::Left(left) => Ok(left), + itertools::EitherOrBoth::Right(right) => Ok(right), + }) + .collect::>(); - *buckets_left = buckets; + *buckets_left = buckets?; } (IntermediateBucketResult::Range(_), _) => { panic!("try merge on different types") @@ -451,6 +500,7 @@ impl IntermediateBucketResult { panic!("try merge on different types") } } + Ok(()) } } @@ -516,7 +566,7 @@ impl IntermediateTermBucketResult { doc_count: entry.doc_count, sub_aggregation: entry .sub_aggregation - .into_final_bucket_result_internal(sub_aggregation_req, limits)?, + .into_final_result_internal(sub_aggregation_req, limits)?, }) }) .collect::>()?; @@ -587,37 +637,39 @@ impl IntermediateTermBucketResult { } trait MergeFruits { - fn merge_fruits(&mut self, other: Self); + fn merge_fruits(&mut self, other: Self) -> crate::Result<()>; } fn merge_serialized_key_maps( entries_left: &mut FxHashMap, mut entries_right: FxHashMap, -) { +) -> crate::Result<()> { for (name, entry_left) in entries_left.iter_mut() { if let Some(entry_right) = entries_right.remove(name) { - entry_left.merge_fruits(entry_right); + entry_left.merge_fruits(entry_right)?; } } for (key, res) in entries_right.into_iter() { entries_left.entry(key).or_insert(res); } + Ok(()) } fn merge_key_maps( entries_left: &mut FxHashMap, mut entries_right: FxHashMap, -) { +) -> crate::Result<()> { for (name, entry_left) in entries_left.iter_mut() { if let Some(entry_right) = entries_right.remove(name) { - entry_left.merge_fruits(entry_right); + entry_left.merge_fruits(entry_right)?; } } for (key, res) in entries_right.into_iter() { entries_left.entry(key).or_insert(res); } + Ok(()) } /// This is the histogram entry for a bucket, which contains a key, count, and optionally @@ -644,7 +696,7 @@ impl IntermediateHistogramBucketEntry { doc_count: self.doc_count, sub_aggregation: self .sub_aggregation - .into_final_bucket_result_internal(req, limits)?, + .into_final_result_internal(req, limits)?, }) } } @@ -690,7 +742,7 @@ impl IntermediateRangeBucketEntry { doc_count: self.doc_count, sub_aggregation: self .sub_aggregation - .into_final_bucket_result_internal(req, limits)?, + .into_final_result_internal(req, limits)?, to: self.to, from: self.from, to_as_string: None, @@ -725,23 +777,26 @@ pub struct IntermediateTermBucketEntry { } impl MergeFruits for IntermediateTermBucketEntry { - fn merge_fruits(&mut self, other: IntermediateTermBucketEntry) { + fn merge_fruits(&mut self, other: IntermediateTermBucketEntry) -> crate::Result<()> { self.doc_count += other.doc_count; - self.sub_aggregation.merge_fruits(other.sub_aggregation); + self.sub_aggregation.merge_fruits(other.sub_aggregation)?; + Ok(()) } } impl MergeFruits for IntermediateRangeBucketEntry { - fn merge_fruits(&mut self, other: IntermediateRangeBucketEntry) { + fn merge_fruits(&mut self, other: IntermediateRangeBucketEntry) -> crate::Result<()> { self.doc_count += other.doc_count; - self.sub_aggregation.merge_fruits(other.sub_aggregation); + self.sub_aggregation.merge_fruits(other.sub_aggregation)?; + Ok(()) } } impl MergeFruits for IntermediateHistogramBucketEntry { - fn merge_fruits(&mut self, other: IntermediateHistogramBucketEntry) { + fn merge_fruits(&mut self, other: IntermediateHistogramBucketEntry) -> crate::Result<()> { self.doc_count += other.doc_count; - self.sub_aggregation.merge_fruits(other.sub_aggregation); + self.sub_aggregation.merge_fruits(other.sub_aggregation)?; + Ok(()) } } @@ -825,7 +880,7 @@ mod tests { ("blue".to_string(), 25, "1900".to_string(), 50), ]); - tree_left.merge_fruits(tree_right); + tree_left.merge_fruits(tree_right).unwrap(); let tree_expected = get_intermediat_tree_with_ranges(&[ ("red".to_string(), 110, "1900".to_string(), 55), @@ -846,7 +901,7 @@ mod tests { ("green".to_string(), 25, "1900".to_string(), 50), ]); - tree_left.merge_fruits(tree_right); + tree_left.merge_fruits(tree_right).unwrap(); let tree_expected = get_intermediat_tree_with_ranges(&[ ("red".to_string(), 110, "1900".to_string(), 55), @@ -866,7 +921,9 @@ mod tests { let orig = tree_left.clone(); - tree_left.merge_fruits(IntermediateAggregationResults::default()); + tree_left + .merge_fruits(IntermediateAggregationResults::default()) + .unwrap(); assert_eq!(tree_left, orig); } diff --git a/src/aggregation/metric/mod.rs b/src/aggregation/metric/mod.rs index 4812f2062..b1d05a05d 100644 --- a/src/aggregation/metric/mod.rs +++ b/src/aggregation/metric/mod.rs @@ -6,12 +6,15 @@ mod average; mod count; mod max; mod min; +mod percentiles; mod stats; mod sum; pub use average::*; pub use count::*; pub use max::*; pub use min::*; +pub use percentiles::*; +use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; pub use stats::*; pub use sum::*; @@ -37,6 +40,33 @@ impl From> for SingleMetricResult { } } +/// This is the wrapper of percentile entries, which can be vector or hashmap +/// depending on if it's keyed or not. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum PercentileValues { + /// Vector format percentile entries + Vec(Vec), + /// HashMap format percentile entries. Key is the serialized percentile + HashMap(FxHashMap), +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +/// The entry when requesting percentiles with keyed: false +pub struct PercentileValuesVecEntry { + key: f64, + value: f64, +} + +/// Single-metric aggregations use this common result structure. +/// +/// Main reason to wrap it in value is to match elasticsearch output structure. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct PercentilesMetricResult { + /// The result of the percentile metric. + pub values: PercentileValues, +} + #[cfg(test)] mod tests { use crate::aggregation::agg_req::Aggregations; diff --git a/src/aggregation/metric/percentiles.rs b/src/aggregation/metric/percentiles.rs new file mode 100644 index 000000000..cf51606a5 --- /dev/null +++ b/src/aggregation/metric/percentiles.rs @@ -0,0 +1,553 @@ +use std::fmt::Debug; + +use columnar::ColumnType; +use serde::{Deserialize, Serialize}; + +use super::*; +use crate::aggregation::agg_req_with_accessor::{ + AggregationsWithAccessor, MetricAggregationWithAccessor, +}; +use crate::aggregation::intermediate_agg_result::{ + IntermediateAggregationResults, IntermediateMetricResult, +}; +use crate::aggregation::segment_agg_result::SegmentAggregationCollector; +use crate::aggregation::{f64_from_fastfield_u64, AggregationError, VecWithNames}; +use crate::{DocId, TantivyError}; + +/// # Percentiles +/// +/// The percentiles aggregation is a useful tool for understanding the distribution +/// of a data set. It calculates the values below which a given percentage of the +/// data falls. For instance, the 95th percentile indicates the value below which +/// 95% of the data points can be found. +/// +/// This aggregation can be particularly interesting for analyzing website load +/// times. By computing the percentiles of load times, you can get insights into +/// how quickly your website loads for different users and identify areas where +/// improvements can be made. +/// +/// To use the percentiles aggregation, you'll need to provide a field to +/// aggregate on. In the case of website load times, this would typically be a +/// field containing the duration of time it takes for the site to load. +/// +/// The JSON format for a percentiles aggregation request is straightforward. The +/// following example demonstrates a request for the percentiles of the "load_time" +/// field: +/// +/// ```json +/// { +/// "percentiles": { +/// "field": "load_time" +/// } +/// } +/// ``` +/// +/// This request will return an object containing the default percentiles (1, 5, +/// 25, 50 (median), 75, 95, and 99). You can also customize the percentiles you want to +/// calculate by providing an array of values in the "percents" parameter: +/// +/// ```json +/// { +/// "percentiles": { +/// "field": "load_time", +/// "percents": [10, 20, 30, 40, 50, 60, 70, 80, 90] +/// } +/// } +/// ``` +/// +/// In this example, the aggregation will return the 10th, 20th, 30th, 40th, 50th, +/// 60th, 70th, 80th, and 90th percentiles of the "load_time" field. +/// +/// Analyzing the percentiles of website load times can help you understand the +/// user experience and identify areas for optimization. For example, if the 95th +/// percentile load time is significantly higher than the median, this indicates +/// that a small percentage of users are experiencing much slower load times than +/// the majority. +/// +/// # Estimating Percentiles +/// +/// While percentiles provide valuable insights into the distribution of data, it's +/// important to understand that they are often estimates. This is because +/// calculating exact percentiles for large data sets can be computationally +/// expensive and time-consuming. As a result, many percentile aggregation +/// algorithms use approximation techniques to provide faster results. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct PercentilesAggregationReq { + /// The field name to compute the percentiles on. + pub field: String, + /// The percentiles to compute. + /// Defaults to [1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0] + pub percents: Option>, + /// Whether to return the percentiles as a hash map + #[serde(default = "default_as_true")] + pub keyed: bool, +} +fn default_percentiles() -> &'static [f64] { + &[1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0] +} +fn default_as_true() -> bool { + true +} + +impl PercentilesAggregationReq { + /// Creates a new [`PercentilesAggregation`] instance from a field name. + pub fn from_field_name(field_name: String) -> Self { + PercentilesAggregationReq { + field: field_name, + percents: None, + keyed: default_as_true(), + } + } + /// Returns the field name the aggregation is computed on. + pub fn field_name(&self) -> &str { + &self.field + } + + fn validate(&self) -> crate::Result<()> { + if let Some(percents) = self.percents.as_ref() { + let all_in_range = percents + .iter() + .cloned() + .all(|percent| (0.0..=100.0).contains(&percent)); + if !all_in_range { + return Err(TantivyError::AggregationError( + AggregationError::InvalidRequest( + "All percentiles have to be between 0.0 and 100.0".to_string(), + ), + )); + } + } + + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct SegmentPercentilesCollector { + field_type: ColumnType, + pub(crate) percentiles: PercentilesCollector, + pub(crate) accessor_idx: usize, + val_cache: Vec, +} + +#[derive(Clone, Serialize, Deserialize)] +/// The percentiles collector used during segment collection and for merging results. +pub struct PercentilesCollector { + sketch: sketches_ddsketch::DDSketch, +} +impl Default for PercentilesCollector { + fn default() -> Self { + Self::new() + } +} + +impl Debug for PercentilesCollector { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("IntermediatePercentiles") + .field("sketch_len", &self.sketch.length()) + .finish() + } +} +impl PartialEq for PercentilesCollector { + fn eq(&self, _other: &Self) -> bool { + false + } +} + +fn format_percentil(percentil: f64) -> String { + let mut out = percentil.to_string(); + // Slightly silly way to format trailing decimals + if !out.contains('.') { + out.push_str(".0"); + } + out +} + +impl PercentilesCollector { + /// Convert result into final result. This will query the quantils from the underlying quantil + /// collector. + pub fn into_final_result(self, req: &PercentilesAggregationReq) -> PercentilesMetricResult { + let percentiles: &[f64] = req + .percents + .as_ref() + .map(|el| el.as_ref()) + .unwrap_or(default_percentiles()); + let iter_quantile_and_values = percentiles.iter().cloned().map(|percentile| { + ( + percentile, + self.sketch + .quantile(percentile / 100.0) + .expect( + "quantil out of range. This error should have been caught during \ + validation phase", + ) + .unwrap_or(f64::NAN), + ) + }); + + let values = if req.keyed { + PercentileValues::HashMap( + iter_quantile_and_values + .map(|(val, quantil)| (format_percentil(val), quantil)) + .collect(), + ) + } else { + PercentileValues::Vec( + iter_quantile_and_values + .map(|(key, value)| PercentileValuesVecEntry { key, value }) + .collect(), + ) + }; + PercentilesMetricResult { values } + } + + fn new() -> Self { + let ddsketch_config = sketches_ddsketch::Config::defaults(); + let sketch = sketches_ddsketch::DDSketch::new(ddsketch_config); + Self { sketch } + } + fn collect(&mut self, val: f64) { + self.sketch.add(val); + } + + pub(crate) fn merge_fruits(&mut self, right: PercentilesCollector) -> crate::Result<()> { + self.sketch.merge(&right.sketch).map_err(|err| { + TantivyError::AggregationError(AggregationError::InternalError(format!( + "Error while merging percentiles {:?}", + err + ))) + })?; + + Ok(()) + } +} + +impl SegmentPercentilesCollector { + pub fn from_req_and_validate( + req: &PercentilesAggregationReq, + field_type: ColumnType, + accessor_idx: usize, + ) -> crate::Result { + req.validate()?; + Ok(Self { + field_type, + percentiles: PercentilesCollector::new(), + accessor_idx, + val_cache: Default::default(), + }) + } + #[inline] + pub(crate) fn collect_block_with_field( + &mut self, + docs: &[DocId], + agg_accessor: &mut MetricAggregationWithAccessor, + ) { + agg_accessor + .column_block_accessor + .fetch_block(docs, &agg_accessor.accessor); + + for val in agg_accessor.column_block_accessor.iter_vals() { + let val1 = f64_from_fastfield_u64(val, &self.field_type); + self.percentiles.collect(val1); + } + } +} + +impl SegmentAggregationCollector for SegmentPercentilesCollector { + #[inline] + fn into_intermediate_aggregations_result( + self: Box, + agg_with_accessor: &AggregationsWithAccessor, + ) -> crate::Result { + let name = agg_with_accessor.metrics.keys[self.accessor_idx].to_string(); + let intermediate_metric_result = IntermediateMetricResult::Percentiles(self.percentiles); + + let metrics = Some(VecWithNames::from_entries(vec![( + name, + intermediate_metric_result, + )])); + + Ok(IntermediateAggregationResults { + metrics, + buckets: None, + }) + } + + #[inline] + fn collect( + &mut self, + doc: crate::DocId, + agg_with_accessor: &mut AggregationsWithAccessor, + ) -> crate::Result<()> { + let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor; + + for val in field.values_for_doc(doc) { + let val1 = f64_from_fastfield_u64(val, &self.field_type); + self.percentiles.collect(val1); + } + + Ok(()) + } + + #[inline] + fn collect_block( + &mut self, + docs: &[crate::DocId], + agg_with_accessor: &mut AggregationsWithAccessor, + ) -> crate::Result<()> { + let field = &mut agg_with_accessor.metrics.values[self.accessor_idx]; + self.collect_block_with_field(docs, field); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use itertools::Itertools; + use more_asserts::{assert_ge, assert_le}; + use rand::rngs::StdRng; + use rand::SeedableRng; + use serde_json::Value; + + use crate::aggregation::agg_req::{Aggregation, Aggregations, MetricAggregation}; + use crate::aggregation::agg_result::AggregationResults; + use crate::aggregation::metric::PercentilesAggregationReq; + use crate::aggregation::tests::{ + get_test_index_from_values, get_test_index_from_values_and_terms, + }; + use crate::aggregation::AggregationCollector; + use crate::query::AllQuery; + + #[test] + fn test_aggregation_percentiles_empty_index() -> crate::Result<()> { + // test index without segments + let values = vec![]; + + let index = get_test_index_from_values(false, &values)?; + + let agg_req_1: Aggregations = vec![( + "percentiles".to_string(), + Aggregation::Metric(MetricAggregation::Percentiles( + PercentilesAggregationReq::from_field_name("score".to_string()), + )), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, Default::default()); + + let reader = index.reader()?; + let searcher = reader.searcher(); + let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + assert_eq!( + res["percentiles"]["values"], + json!({ + "1.0": Value::Null, + "5.0": Value::Null, + "25.0": Value::Null, + "50.0": Value::Null, + "75.0": Value::Null, + "95.0": Value::Null, + "99.0": Value::Null, + }) + ); + + Ok(()) + } + + #[test] + fn test_aggregation_percentile_simple() -> crate::Result<()> { + let values = vec![10.0]; + + let index = get_test_index_from_values(false, &values)?; + + let agg_req_1: Aggregations = vec![( + "percentiles".to_string(), + Aggregation::Metric(MetricAggregation::Percentiles( + PercentilesAggregationReq::from_field_name("score".to_string()), + )), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, Default::default()); + + let reader = index.reader()?; + let searcher = reader.searcher(); + let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + + let percents = vec!["1.0", "5.0", "25.0", "50.0", "75.0", "95.0", "99.0"]; + let range = 9.9..10.1; + for percent in percents { + let val = res["percentiles"]["values"][percent].as_f64().unwrap(); + assert!(range.contains(&val)); + } + + Ok(()) + } + + #[test] + fn test_aggregation_percentile_parameters() -> crate::Result<()> { + let values = vec![10.0]; + + let index = get_test_index_from_values(false, &values)?; + + let agg_req_str = r#" + { + "mypercentiles": { + "percentiles": { + "field": "score", + "percents": [ 95, 99, 99.9 ] + } + } + } "#; + let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap(); + + let collector = AggregationCollector::from_aggs(agg_req_1, Default::default()); + + let reader = index.reader()?; + let searcher = reader.searcher(); + let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + + let percents = vec!["95.0", "99.0", "99.9"]; + let expected_range = 9.9..10.1; + for percent in percents { + let val = res["mypercentiles"]["values"][percent].as_f64().unwrap(); + assert!(expected_range.contains(&val)); + } + // Keyed false + // + let agg_req_str = r#" + { + "mypercentiles": { + "percentiles": { + "field": "score", + "percents": [ 95, 99, 99.9 ], + "keyed": false + } + } + } "#; + let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap(); + + let collector = AggregationCollector::from_aggs(agg_req_1, Default::default()); + + let reader = index.reader()?; + let searcher = reader.searcher(); + let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + + let vals = &res["mypercentiles"]["values"]; + assert_eq!(vals[0]["key"].as_f64().unwrap(), 95.0); + assert_eq!(vals[1]["key"].as_f64().unwrap(), 99.0); + assert_eq!(vals[2]["key"].as_f64().unwrap(), 99.9); + assert_eq!(vals[3]["key"], serde_json::Value::Null); + assert!(expected_range.contains(&vals[0]["value"].as_f64().unwrap())); + assert!(expected_range.contains(&vals[1]["value"].as_f64().unwrap())); + assert!(expected_range.contains(&vals[2]["value"].as_f64().unwrap())); + + Ok(()) + } + + #[test] + fn test_aggregation_percentiles_single_seg() -> crate::Result<()> { + test_aggregation_percentiles(true) + } + + #[test] + fn test_aggregation_percentiles_multi_seg() -> crate::Result<()> { + test_aggregation_percentiles(false) + } + + fn test_aggregation_percentiles(merge_segments: bool) -> crate::Result<()> { + use rand_distr::Distribution; + let num_values_in_segment = vec![100, 30_000, 8000]; + let lg_norm = rand_distr::LogNormal::new(2.996f64, 0.979f64).unwrap(); + let mut rng = StdRng::from_seed([1u8; 32]); + + let segment_data = |i| { + (0..num_values_in_segment[i]) + .map(|_| lg_norm.sample(&mut rng)) + .collect_vec() + }; + + let values = (0..=2).map(segment_data).collect_vec(); + + let mut all_values = values + .iter() + .flat_map(|el| el.iter().cloned()) + .collect_vec(); + all_values.sort_unstable_by(|a, b| a.total_cmp(b)); + + fn get_exact_quantil(q: f64, all_values: &[f64]) -> f64 { + let q = q / 100.0; + assert!((0f64..=1f64).contains(&q)); + + let index = (all_values.len() as f64 * q).ceil() as usize; + let index = index.min(all_values.len() - 1); + all_values[index] + } + + let segment_and_values = values + .into_iter() + .map(|segment_data| { + segment_data + .into_iter() + .map(|val| (val, val.to_string())) + .collect_vec() + }) + .collect_vec(); + + let index = + get_test_index_from_values_and_terms(merge_segments, &segment_and_values).unwrap(); + + let reader = index.reader()?; + + let agg_req_str = r#" + { + "mypercentiles": { + "percentiles": { + "field": "score_f64", + "percents": [ 95, 99, 99.9 ] + } + } + } "#; + let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap(); + + let collector = AggregationCollector::from_aggs(agg_req_1, Default::default()); + + let searcher = reader.searcher(); + let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + let vals = &res["mypercentiles"]["values"]; + + let check_quantil = |exact_quantil: f64, val: f64| { + let lower = exact_quantil - exact_quantil * 0.02; + let upper = exact_quantil + exact_quantil * 0.02; + assert_le!(val, upper); + assert_ge!(val, lower); + }; + + let val = vals["95.0"].as_f64().unwrap(); + let exact_quantil = get_exact_quantil(95.0, &all_values); + check_quantil(exact_quantil, val); + + let val = vals["99.0"].as_f64().unwrap(); + let exact_quantil = get_exact_quantil(99.0, &all_values); + check_quantil(exact_quantil, val); + + let val = vals["99.9"].as_f64().unwrap(); + let exact_quantil = get_exact_quantil(99.9, &all_values); + check_quantil(exact_quantil, val); + + Ok(()) + } +} diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index 6af31061a..56333a60f 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -267,9 +267,9 @@ mod tests { use crate::aggregation::agg_req::{ Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation, - RangeAggregation, }; use crate::aggregation::agg_result::AggregationResults; + use crate::aggregation::bucket::RangeAggregation; use crate::aggregation::metric::StatsAggregation; use crate::aggregation::tests::{get_test_index_2_segments, get_test_index_from_values}; use crate::aggregation::AggregationCollector; @@ -316,7 +316,6 @@ mod tests { #[test] fn test_aggregation_stats_simple() -> crate::Result<()> { - // test index without segments let values = vec![10.0]; let index = get_test_index_from_values(false, &values)?; diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 350b1e46d..dd3965ec0 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -174,6 +174,8 @@ use std::fmt::Display; #[cfg(test)] mod agg_tests; +mod agg_bench; + pub use agg_limits::AggregationLimits; pub use collector::{ AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector, diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index c159825ff..074ff782f 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -13,8 +13,9 @@ use super::agg_req_with_accessor::{ use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTermCollector}; use super::intermediate_agg_result::IntermediateAggregationResults; use super::metric::{ - AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, SegmentStatsCollector, - SegmentStatsType, StatsAggregation, SumAggregation, + AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, + SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation, + SumAggregation, }; use super::VecWithNames; use crate::aggregation::agg_req::BucketAggregationType; @@ -87,28 +88,37 @@ pub(crate) fn build_metric_segment_agg_collector( req: &MetricAggregationWithAccessor, accessor_idx: usize, ) -> crate::Result> { - let stats_collector = match &req.metric { + match &req.metric { MetricAggregation::Average(AverageAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Average, accessor_idx) + Ok(Box::new(SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Average, + accessor_idx, + ))) } - MetricAggregation::Count(CountAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count, accessor_idx) - } - MetricAggregation::Max(MaxAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx) - } - MetricAggregation::Min(MinAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx) - } - MetricAggregation::Stats(StatsAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats, accessor_idx) - } - MetricAggregation::Sum(SumAggregation { .. }) => { - SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx) - } - }; - - Ok(Box::new(stats_collector)) + MetricAggregation::Count(CountAggregation { .. }) => Ok(Box::new( + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count, accessor_idx), + )), + MetricAggregation::Max(MaxAggregation { .. }) => Ok(Box::new( + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx), + )), + MetricAggregation::Min(MinAggregation { .. }) => Ok(Box::new( + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx), + )), + MetricAggregation::Stats(StatsAggregation { .. }) => Ok(Box::new( + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats, accessor_idx), + )), + MetricAggregation::Sum(SumAggregation { .. }) => Ok(Box::new( + SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx), + )), + MetricAggregation::Percentiles(percentiles_req) => Ok(Box::new( + SegmentPercentilesCollector::from_req_and_validate( + percentiles_req, + req.field_type, + accessor_idx, + )?, + )), + } } pub(crate) fn build_bucket_segment_agg_collector(