add percentiles aggregations (#1984)

* add percentiles aggregations

add percentiles aggregation
fix disabled agg benchmark

* Update src/aggregation/metric/percentiles.rs

Co-authored-by: Paul Masurel <paul@quickwit.io>

* Apply suggestions from code review

Co-authored-by: Paul Masurel <paul@quickwit.io>

* fix import

* fix import

---------

Co-authored-by: Paul Masurel <paul@quickwit.io>
This commit is contained in:
PSeitz
2023-04-07 13:18:28 +08:00
committed by GitHub
parent f853bf204b
commit 41af70799d
15 changed files with 828 additions and 124 deletions

View File

@@ -62,6 +62,7 @@ query-grammar = { version= "0.19.0", path="./query-grammar", package = "tantivy-
tantivy-bitpacker = { version= "0.3", path="./bitpacker" }
common = { version= "0.5", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version="0.1", path="./tokenizer-api", package="tantivy-tokenizer-api" }
sketches-ddsketch = { git = "https://github.com/PSeitz/rust-sketches-ddsketch", version = "0.2.0", features = ["use_serde"] }
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"
@@ -78,6 +79,8 @@ env_logger = "0.10.0"
pprof = { version = "0.11.0", features = ["flamegraph", "criterion"] }
futures = "0.3.21"
paste = "1.0.11"
more-asserts = "0.3.1"
rand_distr = "0.4.3"
[dev-dependencies.fail]
version = "0.5.0"

View File

@@ -9,10 +9,9 @@
use serde_json::{Deserializer, Value};
use tantivy::aggregation::agg_req::{
Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation,
RangeAggregation,
};
use tantivy::aggregation::agg_result::AggregationResults;
use tantivy::aggregation::bucket::RangeAggregationRange;
use tantivy::aggregation::bucket::{RangeAggregation, RangeAggregationRange};
use tantivy::aggregation::metric::AverageAggregation;
use tantivy::aggregation::AggregationCollector;
use tantivy::query::AllQuery;

View File

@@ -3,17 +3,27 @@ mod bench {
use columnar::Cardinality;
use rand::prelude::SliceRandom;
use rand::{thread_rng, Rng};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rand_distr::Distribution;
use test::{self, Bencher};
use super::*;
use crate::aggregation::bucket::{
CustomOrder, HistogramAggregation, HistogramBounds, Order, OrderTarget, TermsAggregation,
use crate::aggregation::agg_req::{
Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation,
};
use crate::aggregation::metric::StatsAggregation;
use crate::query::AllQuery;
use crate::schema::{Schema, TextFieldIndexing, FAST, STRING};
use crate::Index;
use crate::aggregation::bucket::{
CustomOrder, HistogramAggregation, HistogramBounds, Order, OrderTarget, RangeAggregation,
TermsAggregation,
};
use crate::aggregation::metric::{AverageAggregation, StatsAggregation};
use crate::aggregation::AggregationCollector;
use crate::query::{AllQuery, TermQuery};
use crate::schema::{IndexRecordOption, Schema, TextFieldIndexing, FAST, STRING};
use crate::{Index, Term};
fn get_collector(agg_req: Aggregations) -> AggregationCollector {
AggregationCollector::from_aggs(agg_req, Default::default())
}
fn get_test_index_bench(cardinality: Cardinality) -> crate::Result<Index> {
let mut schema_builder = Schema::builder();
@@ -31,11 +41,14 @@ mod bench {
let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype);
let index = Index::create_from_tempdir(schema_builder.build())?;
let few_terms_data = vec!["INFO", "ERROR", "WARN", "DEBUG"];
let lg_norm = rand_distr::LogNormal::new(2.996f64, 0.979f64).unwrap();
let many_terms_data = (0..150_000)
.map(|num| format!("author{}", num))
.collect::<Vec<_>>();
{
let mut rng = thread_rng();
let mut rng = StdRng::from_seed([1u8; 32]);
let mut index_writer = index.writer_with_num_threads(1, 100_000_000)?;
// To make the different test cases comparable we just change one doc to force the
// cardinality
@@ -52,8 +65,8 @@ mod bench {
text_field_few_terms => "cool",
score_field => 1u64,
score_field => 1u64,
score_field_f64 => 1.0,
score_field_f64 => 1.0,
score_field_f64 => lg_norm.sample(&mut rng),
score_field_f64 => lg_norm.sample(&mut rng),
score_field_i64 => 1i64,
score_field_i64 => 1i64,
))?;
@@ -65,7 +78,7 @@ mod bench {
text_field_many_terms => many_terms_data.choose(&mut rng).unwrap().to_string(),
text_field_few_terms => few_terms_data.choose(&mut rng).unwrap().to_string(),
score_field => val as u64,
score_field_f64 => val,
score_field_f64 => lg_norm.sample(&mut rng),
score_field_i64 => val as i64,
))?;
}
@@ -186,6 +199,31 @@ mod bench {
});
}
bench_all_cardinalities!(bench_aggregation_percentiles_f64);
fn bench_aggregation_percentiles_f64_card(b: &mut Bencher, cardinality: Cardinality) {
let index = get_test_index_bench(cardinality).unwrap();
let reader = index.reader().unwrap();
b.iter(|| {
let agg_req_str = r#"
{
"mypercentiles": {
"percentiles": {
"field": "score_f64",
"percents": [ 95, 99, 99.9 ]
}
}
} "#;
let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap();
let collector = get_collector(agg_req_1);
let searcher = reader.searcher();
searcher.search(&AllQuery, &collector).unwrap()
});
}
bench_all_cardinalities!(bench_aggregation_average_u64_and_f64);
fn bench_aggregation_average_u64_and_f64_card(b: &mut Bencher, cardinality: Cardinality) {

View File

@@ -49,11 +49,12 @@ use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
pub use super::bucket::RangeAggregation;
use super::bucket::{DateHistogramAggregationReq, HistogramAggregation, TermsAggregation};
use super::bucket::{
DateHistogramAggregationReq, HistogramAggregation, RangeAggregation, TermsAggregation,
};
use super::metric::{
AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, StatsAggregation,
SumAggregation,
AverageAggregation, CountAggregation, MaxAggregation, MinAggregation,
PercentilesAggregationReq, StatsAggregation, SumAggregation,
};
use super::VecWithNames;
@@ -246,9 +247,19 @@ pub enum MetricAggregation {
/// Computes the sum of the extracted values.
#[serde(rename = "sum")]
Sum(SumAggregation),
/// Computes the sum of the extracted values.
#[serde(rename = "percentiles")]
Percentiles(PercentilesAggregationReq),
}
impl MetricAggregation {
pub(crate) fn as_percentile(&self) -> Option<&PercentilesAggregationReq> {
match &self {
MetricAggregation::Percentiles(percentile_req) => Some(percentile_req),
_ => None,
}
}
fn get_fast_field_name(&self) -> &str {
match self {
MetricAggregation::Average(avg) => avg.field_name(),
@@ -257,6 +268,7 @@ impl MetricAggregation {
MetricAggregation::Min(min) => min.field_name(),
MetricAggregation::Stats(stats) => stats.field_name(),
MetricAggregation::Sum(sum) => sum.field_name(),
MetricAggregation::Percentiles(per) => per.field_name(),
}
}
}

View File

@@ -137,6 +137,20 @@ impl MetricAggregationWithAccessor {
Some(get_numeric_or_date_column_types()),
)?;
Ok(MetricAggregationWithAccessor {
accessor,
field_type,
metric: metric.clone(),
column_block_accessor: Default::default(),
})
}
MetricAggregation::Percentiles(percentiles) => {
let (accessor, field_type) = get_ff_reader_and_validate(
reader,
percentiles.field_name(),
Some(get_numeric_or_date_column_types()),
)?;
Ok(MetricAggregationWithAccessor {
accessor,
field_type,

View File

@@ -9,10 +9,10 @@ use serde::{Deserialize, Serialize};
use super::agg_req::BucketAggregationInternal;
use super::bucket::GetDocCount;
use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult};
use super::metric::{SingleMetricResult, Stats};
use super::intermediate_agg_result::IntermediateBucketResult;
use super::metric::{PercentilesMetricResult, SingleMetricResult, Stats};
use super::segment_agg_result::AggregationLimits;
use super::Key;
use super::{AggregationError, Key};
use crate::TantivyError;
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
@@ -94,6 +94,8 @@ pub enum MetricResult {
Stats(Stats),
/// Sum metric result.
Sum(SingleMetricResult),
/// Sum metric result.
Percentiles(PercentilesMetricResult),
}
impl MetricResult {
@@ -105,30 +107,9 @@ impl MetricResult {
MetricResult::Min(min) => Ok(min.value),
MetricResult::Stats(stats) => stats.get_value(agg_property),
MetricResult::Sum(sum) => Ok(sum.value),
}
}
}
impl From<IntermediateMetricResult> for MetricResult {
fn from(metric: IntermediateMetricResult) -> Self {
match metric {
IntermediateMetricResult::Average(intermediate_avg) => {
MetricResult::Average(intermediate_avg.finalize().into())
}
IntermediateMetricResult::Count(intermediate_count) => {
MetricResult::Count(intermediate_count.finalize().into())
}
IntermediateMetricResult::Max(intermediate_max) => {
MetricResult::Max(intermediate_max.finalize().into())
}
IntermediateMetricResult::Min(intermediate_min) => {
MetricResult::Min(intermediate_min.finalize().into())
}
IntermediateMetricResult::Stats(intermediate_stats) => {
MetricResult::Stats(intermediate_stats.finalize())
}
IntermediateMetricResult::Sum(intermediate_sum) => {
MetricResult::Sum(intermediate_sum.finalize().into())
}
MetricResult::Percentiles(_) => Err(TantivyError::AggregationError(
AggregationError::InvalidRequest("percentiles can't be used to order".to_string()),
)),
}
}
}

View File

@@ -111,7 +111,7 @@ fn test_aggregation_flushing(
let searcher = reader.searcher();
let intermediate_agg_result = searcher.search(&AllQuery, &collector).unwrap();
intermediate_agg_result
.into_final_bucket_result(agg_req, &Default::default())
.into_final_result(agg_req, &Default::default())
.unwrap()
} else {
let collector = get_collector(agg_req);
@@ -448,7 +448,7 @@ fn test_aggregation_level2(
// Test de/serialization roundtrip on intermediate_agg_result
let res: IntermediateAggregationResults =
serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap();
res.into_final_bucket_result(agg_req.clone(), &Default::default())
res.into_final_result(agg_req.clone(), &Default::default())
.unwrap()
} else {
let collector = get_collector(agg_req.clone());

View File

@@ -104,7 +104,7 @@ impl Collector for AggregationCollector {
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
) -> crate::Result<Self::Fruit> {
let res = merge_fruits(segment_fruits)?;
res.into_final_bucket_result(self.agg.clone(), &self.limits)
res.into_final_result(self.agg.clone(), &self.limits)
}
}
@@ -114,7 +114,7 @@ fn merge_fruits(
if let Some(fruit) = segment_fruits.pop() {
let mut fruit = fruit?;
for next_fruit in segment_fruits {
fruit.merge_fruits(next_fruit?);
fruit.merge_fruits(next_fruit?)?;
}
Ok(fruit)
} else {

View File

@@ -5,6 +5,12 @@ use super::bucket::DateHistogramParseError;
/// Error that may occur when opening a directory
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum AggregationError {
/// InternalError Aggregation Request
#[error("InternalError: {0:?}")]
InternalError(String),
/// Invalid Aggregation Request
#[error("InvalidRequest: {0:?}")]
InvalidRequest(String),
/// Date histogram parse error
#[error("Date histogram parse error: {0:?}")]
DateHistogramParseError(#[from] DateHistogramParseError),

View File

@@ -12,16 +12,17 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use super::agg_req::{
Aggregations, AggregationsInternal, BucketAggregationInternal, BucketAggregationType,
MetricAggregation, RangeAggregation,
MetricAggregation,
};
use super::agg_result::{AggregationResult, BucketResult, RangeBucketEntry};
use super::agg_result::{AggregationResult, BucketResult, MetricResult, RangeBucketEntry};
use super::bucket::{
cut_off_buckets, get_agg_name_and_property, intermediate_histogram_buckets_to_final_buckets,
GetDocCount, Order, OrderTarget, SegmentHistogramBucketEntry, TermsAggregation,
GetDocCount, Order, OrderTarget, RangeAggregation, SegmentHistogramBucketEntry,
TermsAggregation,
};
use super::metric::{
IntermediateAverage, IntermediateCount, IntermediateMax, IntermediateMin, IntermediateStats,
IntermediateSum,
IntermediateSum, PercentilesCollector,
};
use super::segment_agg_result::AggregationLimits;
use super::{format_date, AggregationError, Key, SerializedKey, VecWithNames};
@@ -41,12 +42,12 @@ pub struct IntermediateAggregationResults {
impl IntermediateAggregationResults {
/// Convert intermediate result and its aggregation request to the final result.
pub fn into_final_bucket_result(
pub fn into_final_result(
self,
req: Aggregations,
limits: &AggregationLimits,
) -> crate::Result<AggregationResults> {
let res = self.into_final_bucket_result_internal(&(req.into()), limits)?;
let res = self.into_final_result_internal(&(req.into()), limits)?;
let bucket_count = res.get_bucket_count() as u32;
if bucket_count > limits.get_bucket_limit() {
return Err(TantivyError::AggregationError(
@@ -63,7 +64,7 @@ impl IntermediateAggregationResults {
///
/// Internal function, AggregationsInternal is used instead Aggregations, which is optimized
/// for internal processing, by splitting metric and buckets into separate groups.
pub(crate) fn into_final_bucket_result_internal(
pub(crate) fn into_final_result_internal(
self,
req: &AggregationsInternal,
limits: &AggregationLimits,
@@ -82,7 +83,7 @@ impl IntermediateAggregationResults {
};
if let Some(metrics) = self.metrics {
convert_and_add_final_metrics_to_result(&mut results, metrics);
convert_and_add_final_metrics_to_result(&mut results, metrics, &req.metrics);
} else {
// When there are no metrics, we create empty metric results, so that the serialized
// json format is constant
@@ -132,12 +133,12 @@ impl IntermediateAggregationResults {
///
/// The order of the values need to be the same on both results. This is ensured when the same
/// (key values) are present on the underlying `VecWithNames` struct.
pub fn merge_fruits(&mut self, other: IntermediateAggregationResults) {
pub fn merge_fruits(&mut self, other: IntermediateAggregationResults) -> crate::Result<()> {
if let (Some(buckets_left), Some(buckets_right)) = (&mut self.buckets, other.buckets) {
for (bucket_left, bucket_right) in
buckets_left.values_mut().zip(buckets_right.into_values())
{
bucket_left.merge_fruits(bucket_right);
bucket_left.merge_fruits(bucket_right)?;
}
}
@@ -145,20 +146,28 @@ impl IntermediateAggregationResults {
for (metric_left, metric_right) in
metrics_left.values_mut().zip(metrics_right.into_values())
{
metric_left.merge_fruits(metric_right);
metric_left.merge_fruits(metric_right)?;
}
}
Ok(())
}
}
fn convert_and_add_final_metrics_to_result(
results: &mut FxHashMap<String, AggregationResult>,
metrics: VecWithNames<IntermediateMetricResult>,
metrics_req: &VecWithNames<MetricAggregation>,
) {
let metric_result_with_request = metrics.into_iter().zip(metrics_req.values());
results.extend(
metrics
metric_result_with_request
.into_iter()
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
.map(|((key, metric), req)| {
(
key,
AggregationResult::MetricResult(metric.into_final_metric_result(req)),
)
}),
);
}
@@ -170,7 +179,7 @@ fn add_empty_final_metrics_to_result(
let empty_bucket = IntermediateMetricResult::empty_from_req(req);
(
key.to_string(),
AggregationResult::MetricResult(empty_bucket.into()),
AggregationResult::MetricResult(empty_bucket.into_final_metric_result(req)),
)
}));
Ok(())
@@ -218,6 +227,8 @@ pub enum IntermediateAggregationResult {
/// Holds the intermediate data for metric results
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum IntermediateMetricResult {
/// Intermediate average result.
Percentiles(PercentilesCollector),
/// Intermediate average result.
Average(IntermediateAverage),
/// Intermediate count result.
@@ -233,6 +244,32 @@ pub enum IntermediateMetricResult {
}
impl IntermediateMetricResult {
fn into_final_metric_result(self, req: &MetricAggregation) -> MetricResult {
match self {
IntermediateMetricResult::Average(intermediate_avg) => {
MetricResult::Average(intermediate_avg.finalize().into())
}
IntermediateMetricResult::Count(intermediate_count) => {
MetricResult::Count(intermediate_count.finalize().into())
}
IntermediateMetricResult::Max(intermediate_max) => {
MetricResult::Max(intermediate_max.finalize().into())
}
IntermediateMetricResult::Min(intermediate_min) => {
MetricResult::Min(intermediate_min.finalize().into())
}
IntermediateMetricResult::Stats(intermediate_stats) => {
MetricResult::Stats(intermediate_stats.finalize())
}
IntermediateMetricResult::Sum(intermediate_sum) => {
MetricResult::Sum(intermediate_sum.finalize().into())
}
IntermediateMetricResult::Percentiles(percentiles) => MetricResult::Percentiles(
percentiles.into_final_result(req.as_percentile().expect("unexpected metric type")),
),
}
}
pub(crate) fn empty_from_req(req: &MetricAggregation) -> Self {
match req {
MetricAggregation::Average(_) => {
@@ -247,9 +284,12 @@ impl IntermediateMetricResult {
IntermediateMetricResult::Stats(IntermediateStats::default())
}
MetricAggregation::Sum(_) => IntermediateMetricResult::Sum(IntermediateSum::default()),
MetricAggregation::Percentiles(_) => {
IntermediateMetricResult::Percentiles(PercentilesCollector::default())
}
}
}
fn merge_fruits(&mut self, other: IntermediateMetricResult) {
fn merge_fruits(&mut self, other: IntermediateMetricResult) -> crate::Result<()> {
match (self, other) {
(
IntermediateMetricResult::Average(avg_left),
@@ -278,10 +318,18 @@ impl IntermediateMetricResult {
(IntermediateMetricResult::Sum(sum_left), IntermediateMetricResult::Sum(sum_right)) => {
sum_left.merge_fruits(sum_right);
}
(
IntermediateMetricResult::Percentiles(left),
IntermediateMetricResult::Percentiles(right),
) => {
left.merge_fruits(right)?;
}
_ => {
panic!("incompatible fruit types in tree");
panic!("incompatible fruit types in tree or missing merge_fruits handler");
}
}
Ok(())
}
}
@@ -396,13 +444,13 @@ impl IntermediateBucketResult {
}
}
}
fn merge_fruits(&mut self, other: IntermediateBucketResult) {
fn merge_fruits(&mut self, other: IntermediateBucketResult) -> crate::Result<()> {
match (self, other) {
(
IntermediateBucketResult::Terms(term_res_left),
IntermediateBucketResult::Terms(term_res_right),
) => {
merge_key_maps(&mut term_res_left.entries, term_res_right.entries);
merge_key_maps(&mut term_res_left.entries, term_res_right.entries)?;
term_res_left.sum_other_doc_count += term_res_right.sum_other_doc_count;
term_res_left.doc_count_error_upper_bound +=
term_res_right.doc_count_error_upper_bound;
@@ -412,7 +460,7 @@ impl IntermediateBucketResult {
IntermediateBucketResult::Range(range_res_left),
IntermediateBucketResult::Range(range_res_right),
) => {
merge_serialized_key_maps(&mut range_res_left.buckets, range_res_right.buckets);
merge_serialized_key_maps(&mut range_res_left.buckets, range_res_right.buckets)?;
}
(
IntermediateBucketResult::Histogram {
@@ -424,22 +472,23 @@ impl IntermediateBucketResult {
..
},
) => {
let buckets = buckets_left
.drain(..)
.merge_join_by(buckets_right.into_iter(), |left, right| {
left.key.partial_cmp(&right.key).unwrap_or(Ordering::Equal)
})
.map(|either| match either {
itertools::EitherOrBoth::Both(mut left, right) => {
left.merge_fruits(right);
left
}
itertools::EitherOrBoth::Left(left) => left,
itertools::EitherOrBoth::Right(right) => right,
})
.collect();
let buckets: Result<Vec<IntermediateHistogramBucketEntry>, TantivyError> =
buckets_left
.drain(..)
.merge_join_by(buckets_right.into_iter(), |left, right| {
left.key.partial_cmp(&right.key).unwrap_or(Ordering::Equal)
})
.map(|either| match either {
itertools::EitherOrBoth::Both(mut left, right) => {
left.merge_fruits(right)?;
Ok(left)
}
itertools::EitherOrBoth::Left(left) => Ok(left),
itertools::EitherOrBoth::Right(right) => Ok(right),
})
.collect::<Result<_, _>>();
*buckets_left = buckets;
*buckets_left = buckets?;
}
(IntermediateBucketResult::Range(_), _) => {
panic!("try merge on different types")
@@ -451,6 +500,7 @@ impl IntermediateBucketResult {
panic!("try merge on different types")
}
}
Ok(())
}
}
@@ -516,7 +566,7 @@ impl IntermediateTermBucketResult {
doc_count: entry.doc_count,
sub_aggregation: entry
.sub_aggregation
.into_final_bucket_result_internal(sub_aggregation_req, limits)?,
.into_final_result_internal(sub_aggregation_req, limits)?,
})
})
.collect::<crate::Result<_>>()?;
@@ -587,37 +637,39 @@ impl IntermediateTermBucketResult {
}
trait MergeFruits {
fn merge_fruits(&mut self, other: Self);
fn merge_fruits(&mut self, other: Self) -> crate::Result<()>;
}
fn merge_serialized_key_maps<V: MergeFruits + Clone>(
entries_left: &mut FxHashMap<SerializedKey, V>,
mut entries_right: FxHashMap<SerializedKey, V>,
) {
) -> crate::Result<()> {
for (name, entry_left) in entries_left.iter_mut() {
if let Some(entry_right) = entries_right.remove(name) {
entry_left.merge_fruits(entry_right);
entry_left.merge_fruits(entry_right)?;
}
}
for (key, res) in entries_right.into_iter() {
entries_left.entry(key).or_insert(res);
}
Ok(())
}
fn merge_key_maps<V: MergeFruits + Clone>(
entries_left: &mut FxHashMap<Key, V>,
mut entries_right: FxHashMap<Key, V>,
) {
) -> crate::Result<()> {
for (name, entry_left) in entries_left.iter_mut() {
if let Some(entry_right) = entries_right.remove(name) {
entry_left.merge_fruits(entry_right);
entry_left.merge_fruits(entry_right)?;
}
}
for (key, res) in entries_right.into_iter() {
entries_left.entry(key).or_insert(res);
}
Ok(())
}
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
@@ -644,7 +696,7 @@ impl IntermediateHistogramBucketEntry {
doc_count: self.doc_count,
sub_aggregation: self
.sub_aggregation
.into_final_bucket_result_internal(req, limits)?,
.into_final_result_internal(req, limits)?,
})
}
}
@@ -690,7 +742,7 @@ impl IntermediateRangeBucketEntry {
doc_count: self.doc_count,
sub_aggregation: self
.sub_aggregation
.into_final_bucket_result_internal(req, limits)?,
.into_final_result_internal(req, limits)?,
to: self.to,
from: self.from,
to_as_string: None,
@@ -725,23 +777,26 @@ pub struct IntermediateTermBucketEntry {
}
impl MergeFruits for IntermediateTermBucketEntry {
fn merge_fruits(&mut self, other: IntermediateTermBucketEntry) {
fn merge_fruits(&mut self, other: IntermediateTermBucketEntry) -> crate::Result<()> {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(other.sub_aggregation);
self.sub_aggregation.merge_fruits(other.sub_aggregation)?;
Ok(())
}
}
impl MergeFruits for IntermediateRangeBucketEntry {
fn merge_fruits(&mut self, other: IntermediateRangeBucketEntry) {
fn merge_fruits(&mut self, other: IntermediateRangeBucketEntry) -> crate::Result<()> {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(other.sub_aggregation);
self.sub_aggregation.merge_fruits(other.sub_aggregation)?;
Ok(())
}
}
impl MergeFruits for IntermediateHistogramBucketEntry {
fn merge_fruits(&mut self, other: IntermediateHistogramBucketEntry) {
fn merge_fruits(&mut self, other: IntermediateHistogramBucketEntry) -> crate::Result<()> {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(other.sub_aggregation);
self.sub_aggregation.merge_fruits(other.sub_aggregation)?;
Ok(())
}
}
@@ -825,7 +880,7 @@ mod tests {
("blue".to_string(), 25, "1900".to_string(), 50),
]);
tree_left.merge_fruits(tree_right);
tree_left.merge_fruits(tree_right).unwrap();
let tree_expected = get_intermediat_tree_with_ranges(&[
("red".to_string(), 110, "1900".to_string(), 55),
@@ -846,7 +901,7 @@ mod tests {
("green".to_string(), 25, "1900".to_string(), 50),
]);
tree_left.merge_fruits(tree_right);
tree_left.merge_fruits(tree_right).unwrap();
let tree_expected = get_intermediat_tree_with_ranges(&[
("red".to_string(), 110, "1900".to_string(), 55),
@@ -866,7 +921,9 @@ mod tests {
let orig = tree_left.clone();
tree_left.merge_fruits(IntermediateAggregationResults::default());
tree_left
.merge_fruits(IntermediateAggregationResults::default())
.unwrap();
assert_eq!(tree_left, orig);
}

View File

@@ -6,12 +6,15 @@ mod average;
mod count;
mod max;
mod min;
mod percentiles;
mod stats;
mod sum;
pub use average::*;
pub use count::*;
pub use max::*;
pub use min::*;
pub use percentiles::*;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
pub use stats::*;
pub use sum::*;
@@ -37,6 +40,33 @@ impl From<Option<f64>> for SingleMetricResult {
}
}
/// This is the wrapper of percentile entries, which can be vector or hashmap
/// depending on if it's keyed or not.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum PercentileValues {
/// Vector format percentile entries
Vec(Vec<PercentileValuesVecEntry>),
/// HashMap format percentile entries. Key is the serialized percentile
HashMap(FxHashMap<String, f64>),
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// The entry when requesting percentiles with keyed: false
pub struct PercentileValuesVecEntry {
key: f64,
value: f64,
}
/// Single-metric aggregations use this common result structure.
///
/// Main reason to wrap it in value is to match elasticsearch output structure.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct PercentilesMetricResult {
/// The result of the percentile metric.
pub values: PercentileValues,
}
#[cfg(test)]
mod tests {
use crate::aggregation::agg_req::Aggregations;

View File

@@ -0,0 +1,553 @@
use std::fmt::Debug;
use columnar::ColumnType;
use serde::{Deserialize, Serialize};
use super::*;
use crate::aggregation::agg_req_with_accessor::{
AggregationsWithAccessor, MetricAggregationWithAccessor,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateMetricResult,
};
use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
use crate::aggregation::{f64_from_fastfield_u64, AggregationError, VecWithNames};
use crate::{DocId, TantivyError};
/// # Percentiles
///
/// The percentiles aggregation is a useful tool for understanding the distribution
/// of a data set. It calculates the values below which a given percentage of the
/// data falls. For instance, the 95th percentile indicates the value below which
/// 95% of the data points can be found.
///
/// This aggregation can be particularly interesting for analyzing website load
/// times. By computing the percentiles of load times, you can get insights into
/// how quickly your website loads for different users and identify areas where
/// improvements can be made.
///
/// To use the percentiles aggregation, you'll need to provide a field to
/// aggregate on. In the case of website load times, this would typically be a
/// field containing the duration of time it takes for the site to load.
///
/// The JSON format for a percentiles aggregation request is straightforward. The
/// following example demonstrates a request for the percentiles of the "load_time"
/// field:
///
/// ```json
/// {
/// "percentiles": {
/// "field": "load_time"
/// }
/// }
/// ```
///
/// This request will return an object containing the default percentiles (1, 5,
/// 25, 50 (median), 75, 95, and 99). You can also customize the percentiles you want to
/// calculate by providing an array of values in the "percents" parameter:
///
/// ```json
/// {
/// "percentiles": {
/// "field": "load_time",
/// "percents": [10, 20, 30, 40, 50, 60, 70, 80, 90]
/// }
/// }
/// ```
///
/// In this example, the aggregation will return the 10th, 20th, 30th, 40th, 50th,
/// 60th, 70th, 80th, and 90th percentiles of the "load_time" field.
///
/// Analyzing the percentiles of website load times can help you understand the
/// user experience and identify areas for optimization. For example, if the 95th
/// percentile load time is significantly higher than the median, this indicates
/// that a small percentage of users are experiencing much slower load times than
/// the majority.
///
/// # Estimating Percentiles
///
/// While percentiles provide valuable insights into the distribution of data, it's
/// important to understand that they are often estimates. This is because
/// calculating exact percentiles for large data sets can be computationally
/// expensive and time-consuming. As a result, many percentile aggregation
/// algorithms use approximation techniques to provide faster results.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct PercentilesAggregationReq {
/// The field name to compute the percentiles on.
pub field: String,
/// The percentiles to compute.
/// Defaults to [1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0]
pub percents: Option<Vec<f64>>,
/// Whether to return the percentiles as a hash map
#[serde(default = "default_as_true")]
pub keyed: bool,
}
fn default_percentiles() -> &'static [f64] {
&[1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0]
}
fn default_as_true() -> bool {
true
}
impl PercentilesAggregationReq {
/// Creates a new [`PercentilesAggregation`] instance from a field name.
pub fn from_field_name(field_name: String) -> Self {
PercentilesAggregationReq {
field: field_name,
percents: None,
keyed: default_as_true(),
}
}
/// Returns the field name the aggregation is computed on.
pub fn field_name(&self) -> &str {
&self.field
}
fn validate(&self) -> crate::Result<()> {
if let Some(percents) = self.percents.as_ref() {
let all_in_range = percents
.iter()
.cloned()
.all(|percent| (0.0..=100.0).contains(&percent));
if !all_in_range {
return Err(TantivyError::AggregationError(
AggregationError::InvalidRequest(
"All percentiles have to be between 0.0 and 100.0".to_string(),
),
));
}
}
Ok(())
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct SegmentPercentilesCollector {
field_type: ColumnType,
pub(crate) percentiles: PercentilesCollector,
pub(crate) accessor_idx: usize,
val_cache: Vec<u64>,
}
#[derive(Clone, Serialize, Deserialize)]
/// The percentiles collector used during segment collection and for merging results.
pub struct PercentilesCollector {
sketch: sketches_ddsketch::DDSketch,
}
impl Default for PercentilesCollector {
fn default() -> Self {
Self::new()
}
}
impl Debug for PercentilesCollector {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("IntermediatePercentiles")
.field("sketch_len", &self.sketch.length())
.finish()
}
}
impl PartialEq for PercentilesCollector {
fn eq(&self, _other: &Self) -> bool {
false
}
}
fn format_percentil(percentil: f64) -> String {
let mut out = percentil.to_string();
// Slightly silly way to format trailing decimals
if !out.contains('.') {
out.push_str(".0");
}
out
}
impl PercentilesCollector {
/// Convert result into final result. This will query the quantils from the underlying quantil
/// collector.
pub fn into_final_result(self, req: &PercentilesAggregationReq) -> PercentilesMetricResult {
let percentiles: &[f64] = req
.percents
.as_ref()
.map(|el| el.as_ref())
.unwrap_or(default_percentiles());
let iter_quantile_and_values = percentiles.iter().cloned().map(|percentile| {
(
percentile,
self.sketch
.quantile(percentile / 100.0)
.expect(
"quantil out of range. This error should have been caught during \
validation phase",
)
.unwrap_or(f64::NAN),
)
});
let values = if req.keyed {
PercentileValues::HashMap(
iter_quantile_and_values
.map(|(val, quantil)| (format_percentil(val), quantil))
.collect(),
)
} else {
PercentileValues::Vec(
iter_quantile_and_values
.map(|(key, value)| PercentileValuesVecEntry { key, value })
.collect(),
)
};
PercentilesMetricResult { values }
}
fn new() -> Self {
let ddsketch_config = sketches_ddsketch::Config::defaults();
let sketch = sketches_ddsketch::DDSketch::new(ddsketch_config);
Self { sketch }
}
fn collect(&mut self, val: f64) {
self.sketch.add(val);
}
pub(crate) fn merge_fruits(&mut self, right: PercentilesCollector) -> crate::Result<()> {
self.sketch.merge(&right.sketch).map_err(|err| {
TantivyError::AggregationError(AggregationError::InternalError(format!(
"Error while merging percentiles {:?}",
err
)))
})?;
Ok(())
}
}
impl SegmentPercentilesCollector {
pub fn from_req_and_validate(
req: &PercentilesAggregationReq,
field_type: ColumnType,
accessor_idx: usize,
) -> crate::Result<Self> {
req.validate()?;
Ok(Self {
field_type,
percentiles: PercentilesCollector::new(),
accessor_idx,
val_cache: Default::default(),
})
}
#[inline]
pub(crate) fn collect_block_with_field(
&mut self,
docs: &[DocId],
agg_accessor: &mut MetricAggregationWithAccessor,
) {
agg_accessor
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.percentiles.collect(val1);
}
}
}
impl SegmentAggregationCollector for SegmentPercentilesCollector {
#[inline]
fn into_intermediate_aggregations_result(
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<IntermediateAggregationResults> {
let name = agg_with_accessor.metrics.keys[self.accessor_idx].to_string();
let intermediate_metric_result = IntermediateMetricResult::Percentiles(self.percentiles);
let metrics = Some(VecWithNames::from_entries(vec![(
name,
intermediate_metric_result,
)]));
Ok(IntermediateAggregationResults {
metrics,
buckets: None,
})
}
#[inline]
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor;
for val in field.values_for_doc(doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.percentiles.collect(val1);
}
Ok(())
}
#[inline]
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let field = &mut agg_with_accessor.metrics.values[self.accessor_idx];
self.collect_block_with_field(docs, field);
Ok(())
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use more_asserts::{assert_ge, assert_le};
use rand::rngs::StdRng;
use rand::SeedableRng;
use serde_json::Value;
use crate::aggregation::agg_req::{Aggregation, Aggregations, MetricAggregation};
use crate::aggregation::agg_result::AggregationResults;
use crate::aggregation::metric::PercentilesAggregationReq;
use crate::aggregation::tests::{
get_test_index_from_values, get_test_index_from_values_and_terms,
};
use crate::aggregation::AggregationCollector;
use crate::query::AllQuery;
#[test]
fn test_aggregation_percentiles_empty_index() -> crate::Result<()> {
// test index without segments
let values = vec![];
let index = get_test_index_from_values(false, &values)?;
let agg_req_1: Aggregations = vec![(
"percentiles".to_string(),
Aggregation::Metric(MetricAggregation::Percentiles(
PercentilesAggregationReq::from_field_name("score".to_string()),
)),
)]
.into_iter()
.collect();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
assert_eq!(
res["percentiles"]["values"],
json!({
"1.0": Value::Null,
"5.0": Value::Null,
"25.0": Value::Null,
"50.0": Value::Null,
"75.0": Value::Null,
"95.0": Value::Null,
"99.0": Value::Null,
})
);
Ok(())
}
#[test]
fn test_aggregation_percentile_simple() -> crate::Result<()> {
let values = vec![10.0];
let index = get_test_index_from_values(false, &values)?;
let agg_req_1: Aggregations = vec![(
"percentiles".to_string(),
Aggregation::Metric(MetricAggregation::Percentiles(
PercentilesAggregationReq::from_field_name("score".to_string()),
)),
)]
.into_iter()
.collect();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
let percents = vec!["1.0", "5.0", "25.0", "50.0", "75.0", "95.0", "99.0"];
let range = 9.9..10.1;
for percent in percents {
let val = res["percentiles"]["values"][percent].as_f64().unwrap();
assert!(range.contains(&val));
}
Ok(())
}
#[test]
fn test_aggregation_percentile_parameters() -> crate::Result<()> {
let values = vec![10.0];
let index = get_test_index_from_values(false, &values)?;
let agg_req_str = r#"
{
"mypercentiles": {
"percentiles": {
"field": "score",
"percents": [ 95, 99, 99.9 ]
}
}
} "#;
let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
let percents = vec!["95.0", "99.0", "99.9"];
let expected_range = 9.9..10.1;
for percent in percents {
let val = res["mypercentiles"]["values"][percent].as_f64().unwrap();
assert!(expected_range.contains(&val));
}
// Keyed false
//
let agg_req_str = r#"
{
"mypercentiles": {
"percentiles": {
"field": "score",
"percents": [ 95, 99, 99.9 ],
"keyed": false
}
}
} "#;
let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
let vals = &res["mypercentiles"]["values"];
assert_eq!(vals[0]["key"].as_f64().unwrap(), 95.0);
assert_eq!(vals[1]["key"].as_f64().unwrap(), 99.0);
assert_eq!(vals[2]["key"].as_f64().unwrap(), 99.9);
assert_eq!(vals[3]["key"], serde_json::Value::Null);
assert!(expected_range.contains(&vals[0]["value"].as_f64().unwrap()));
assert!(expected_range.contains(&vals[1]["value"].as_f64().unwrap()));
assert!(expected_range.contains(&vals[2]["value"].as_f64().unwrap()));
Ok(())
}
#[test]
fn test_aggregation_percentiles_single_seg() -> crate::Result<()> {
test_aggregation_percentiles(true)
}
#[test]
fn test_aggregation_percentiles_multi_seg() -> crate::Result<()> {
test_aggregation_percentiles(false)
}
fn test_aggregation_percentiles(merge_segments: bool) -> crate::Result<()> {
use rand_distr::Distribution;
let num_values_in_segment = vec![100, 30_000, 8000];
let lg_norm = rand_distr::LogNormal::new(2.996f64, 0.979f64).unwrap();
let mut rng = StdRng::from_seed([1u8; 32]);
let segment_data = |i| {
(0..num_values_in_segment[i])
.map(|_| lg_norm.sample(&mut rng))
.collect_vec()
};
let values = (0..=2).map(segment_data).collect_vec();
let mut all_values = values
.iter()
.flat_map(|el| el.iter().cloned())
.collect_vec();
all_values.sort_unstable_by(|a, b| a.total_cmp(b));
fn get_exact_quantil(q: f64, all_values: &[f64]) -> f64 {
let q = q / 100.0;
assert!((0f64..=1f64).contains(&q));
let index = (all_values.len() as f64 * q).ceil() as usize;
let index = index.min(all_values.len() - 1);
all_values[index]
}
let segment_and_values = values
.into_iter()
.map(|segment_data| {
segment_data
.into_iter()
.map(|val| (val, val.to_string()))
.collect_vec()
})
.collect_vec();
let index =
get_test_index_from_values_and_terms(merge_segments, &segment_and_values).unwrap();
let reader = index.reader()?;
let agg_req_str = r#"
{
"mypercentiles": {
"percentiles": {
"field": "score_f64",
"percents": [ 95, 99, 99.9 ]
}
}
} "#;
let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
let vals = &res["mypercentiles"]["values"];
let check_quantil = |exact_quantil: f64, val: f64| {
let lower = exact_quantil - exact_quantil * 0.02;
let upper = exact_quantil + exact_quantil * 0.02;
assert_le!(val, upper);
assert_ge!(val, lower);
};
let val = vals["95.0"].as_f64().unwrap();
let exact_quantil = get_exact_quantil(95.0, &all_values);
check_quantil(exact_quantil, val);
let val = vals["99.0"].as_f64().unwrap();
let exact_quantil = get_exact_quantil(99.0, &all_values);
check_quantil(exact_quantil, val);
let val = vals["99.9"].as_f64().unwrap();
let exact_quantil = get_exact_quantil(99.9, &all_values);
check_quantil(exact_quantil, val);
Ok(())
}
}

View File

@@ -267,9 +267,9 @@ mod tests {
use crate::aggregation::agg_req::{
Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation,
RangeAggregation,
};
use crate::aggregation::agg_result::AggregationResults;
use crate::aggregation::bucket::RangeAggregation;
use crate::aggregation::metric::StatsAggregation;
use crate::aggregation::tests::{get_test_index_2_segments, get_test_index_from_values};
use crate::aggregation::AggregationCollector;
@@ -316,7 +316,6 @@ mod tests {
#[test]
fn test_aggregation_stats_simple() -> crate::Result<()> {
// test index without segments
let values = vec![10.0];
let index = get_test_index_from_values(false, &values)?;

View File

@@ -174,6 +174,8 @@ use std::fmt::Display;
#[cfg(test)]
mod agg_tests;
mod agg_bench;
pub use agg_limits::AggregationLimits;
pub use collector::{
AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector,

View File

@@ -13,8 +13,9 @@ use super::agg_req_with_accessor::{
use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTermCollector};
use super::intermediate_agg_result::IntermediateAggregationResults;
use super::metric::{
AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, SegmentStatsCollector,
SegmentStatsType, StatsAggregation, SumAggregation,
AverageAggregation, CountAggregation, MaxAggregation, MinAggregation,
SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation,
SumAggregation,
};
use super::VecWithNames;
use crate::aggregation::agg_req::BucketAggregationType;
@@ -87,28 +88,37 @@ pub(crate) fn build_metric_segment_agg_collector(
req: &MetricAggregationWithAccessor,
accessor_idx: usize,
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
let stats_collector = match &req.metric {
match &req.metric {
MetricAggregation::Average(AverageAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Average, accessor_idx)
Ok(Box::new(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Average,
accessor_idx,
)))
}
MetricAggregation::Count(CountAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count, accessor_idx)
}
MetricAggregation::Max(MaxAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx)
}
MetricAggregation::Min(MinAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx)
}
MetricAggregation::Stats(StatsAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats, accessor_idx)
}
MetricAggregation::Sum(SumAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx)
}
};
Ok(Box::new(stats_collector))
MetricAggregation::Count(CountAggregation { .. }) => Ok(Box::new(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count, accessor_idx),
)),
MetricAggregation::Max(MaxAggregation { .. }) => Ok(Box::new(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx),
)),
MetricAggregation::Min(MinAggregation { .. }) => Ok(Box::new(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx),
)),
MetricAggregation::Stats(StatsAggregation { .. }) => Ok(Box::new(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats, accessor_idx),
)),
MetricAggregation::Sum(SumAggregation { .. }) => Ok(Box::new(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx),
)),
MetricAggregation::Percentiles(percentiles_req) => Ok(Box::new(
SegmentPercentilesCollector::from_req_and_validate(
percentiles_req,
req.field_type,
accessor_idx,
)?,
)),
}
}
pub(crate) fn build_bucket_segment_agg_collector(