Files
tantivy/src/aggregation/metric/percentiles.rs
PSeitz 6239697a02 switch to ms in histogram for date type (#2045)
* switch to ms in histogram for date type

switch to ms in histogram, by adding a normalization step that converts
to nanoseconds precision when creating the collector.

closes #2028
related to #2026

* add missing unit long variants

* use single thread to avoid handling test case

* fix docs

* revert CI

* cleanup

* improve docs

* Update src/aggregation/bucket/histogram/histogram.rs

Co-authored-by: Paul Masurel <paul@quickwit.io>

---------

Co-authored-by: Paul Masurel <paul@quickwit.io>
2023-05-19 08:15:44 +02:00

549 lines
18 KiB
Rust

use std::fmt::Debug;
use columnar::ColumnType;
use serde::{Deserialize, Serialize};
use super::*;
use crate::aggregation::agg_req_with_accessor::{
AggregationWithAccessor, AggregationsWithAccessor,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateMetricResult,
};
use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
use crate::aggregation::{f64_from_fastfield_u64, AggregationError};
use crate::{DocId, TantivyError};
/// # Percentiles
///
/// The percentiles aggregation is a useful tool for understanding the distribution
/// of a data set. It calculates the values below which a given percentage of the
/// data falls. For instance, the 95th percentile indicates the value below which
/// 95% of the data points can be found.
///
/// This aggregation can be particularly interesting for analyzing website or service response
/// times. For example, if the 95th percentile website load time is significantly higher than the
/// median, this indicates that a small percentage of users are experiencing much slower load times
/// than the majority.
///
/// To use the percentiles aggregation, you'll need to provide a field to
/// aggregate on. In the case of website load times, this would typically be a
/// field containing the duration of time it takes for the site to load.
///
/// The following example demonstrates a request for the percentiles of the "load_time"
/// field:
///
/// ```JSON
/// {
/// "percentiles": {
/// "field": "load_time"
/// }
/// }
/// ```
///
/// This request will return an object containing the default percentiles (1, 5,
/// 25, 50 (median), 75, 95, and 99). You can also customize the percentiles you want to
/// calculate by providing an array of values in the "percents" parameter:
///
/// ```JSON
/// {
/// "percentiles": {
/// "field": "load_time",
/// "percents": [10, 20, 30, 40, 50, 60, 70, 80, 90]
/// }
/// }
/// ```
///
/// In this example, the aggregation will return the 10th, 20th, 30th, 40th, 50th,
/// 60th, 70th, 80th, and 90th percentiles of the "load_time" field.
///
/// Analyzing the percentiles of website load times can help you understand the
/// user experience and identify areas for optimization. For example, if the 95th
/// percentile load time is significantly higher than the median, this indicates
/// that a small percentage of users are experiencing much slower load times than
/// the majority.
///
/// # Estimating Percentiles
///
/// While percentiles provide valuable insights into the distribution of data, it's
/// important to understand that they are often estimates. This is because
/// calculating exact percentiles for large data sets can be computationally
/// expensive and time-consuming. As a result, many percentile aggregation
/// algorithms use approximation techniques to provide faster results.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct PercentilesAggregationReq {
/// The field name to compute the percentiles on.
pub field: String,
/// The percentiles to compute.
/// Defaults to [1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0]
pub percents: Option<Vec<f64>>,
/// Whether to return the percentiles as a hash map
#[serde(default = "default_as_true")]
pub keyed: bool,
}
fn default_percentiles() -> &'static [f64] {
&[1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0]
}
fn default_as_true() -> bool {
true
}
impl PercentilesAggregationReq {
/// Creates a new [`PercentilesAggregationReq`] instance from a field name.
pub fn from_field_name(field_name: String) -> Self {
PercentilesAggregationReq {
field: field_name,
percents: None,
keyed: default_as_true(),
}
}
/// Returns the field name the aggregation is computed on.
pub fn field_name(&self) -> &str {
&self.field
}
fn validate(&self) -> crate::Result<()> {
if let Some(percents) = self.percents.as_ref() {
let all_in_range = percents
.iter()
.cloned()
.all(|percent| (0.0..=100.0).contains(&percent));
if !all_in_range {
return Err(TantivyError::AggregationError(
AggregationError::InvalidRequest(
"All percentiles have to be between 0.0 and 100.0".to_string(),
),
));
}
}
Ok(())
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct SegmentPercentilesCollector {
field_type: ColumnType,
pub(crate) percentiles: PercentilesCollector,
pub(crate) accessor_idx: usize,
val_cache: Vec<u64>,
}
#[derive(Clone, Serialize, Deserialize)]
/// The percentiles collector used during segment collection and for merging results.
pub struct PercentilesCollector {
sketch: sketches_ddsketch::DDSketch,
}
impl Default for PercentilesCollector {
fn default() -> Self {
Self::new()
}
}
impl Debug for PercentilesCollector {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("IntermediatePercentiles")
.field("sketch_len", &self.sketch.length())
.finish()
}
}
impl PartialEq for PercentilesCollector {
fn eq(&self, _other: &Self) -> bool {
false
}
}
fn format_percentil(percentil: f64) -> String {
let mut out = percentil.to_string();
// Slightly silly way to format trailing decimals
if !out.contains('.') {
out.push_str(".0");
}
out
}
impl PercentilesCollector {
/// Convert result into final result. This will query the quantils from the underlying quantil
/// collector.
pub fn into_final_result(self, req: &PercentilesAggregationReq) -> PercentilesMetricResult {
let percentiles: &[f64] = req
.percents
.as_ref()
.map(|el| el.as_ref())
.unwrap_or(default_percentiles());
let iter_quantile_and_values = percentiles.iter().cloned().map(|percentile| {
(
percentile,
self.sketch
.quantile(percentile / 100.0)
.expect(
"quantil out of range. This error should have been caught during \
validation phase",
)
.unwrap_or(f64::NAN),
)
});
let values = if req.keyed {
PercentileValues::HashMap(
iter_quantile_and_values
.map(|(val, quantil)| (format_percentil(val), quantil))
.collect(),
)
} else {
PercentileValues::Vec(
iter_quantile_and_values
.map(|(key, value)| PercentileValuesVecEntry { key, value })
.collect(),
)
};
PercentilesMetricResult { values }
}
fn new() -> Self {
let ddsketch_config = sketches_ddsketch::Config::defaults();
let sketch = sketches_ddsketch::DDSketch::new(ddsketch_config);
Self { sketch }
}
fn collect(&mut self, val: f64) {
self.sketch.add(val);
}
pub(crate) fn merge_fruits(&mut self, right: PercentilesCollector) -> crate::Result<()> {
self.sketch.merge(&right.sketch).map_err(|err| {
TantivyError::AggregationError(AggregationError::InternalError(format!(
"Error while merging percentiles {err:?}"
)))
})?;
Ok(())
}
}
impl SegmentPercentilesCollector {
pub fn from_req_and_validate(
req: &PercentilesAggregationReq,
field_type: ColumnType,
accessor_idx: usize,
) -> crate::Result<Self> {
req.validate()?;
Ok(Self {
field_type,
percentiles: PercentilesCollector::new(),
accessor_idx,
val_cache: Default::default(),
})
}
#[inline]
pub(crate) fn collect_block_with_field(
&mut self,
docs: &[DocId],
agg_accessor: &mut AggregationWithAccessor,
) {
agg_accessor
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.percentiles.collect(val1);
}
}
}
impl SegmentAggregationCollector for SegmentPercentilesCollector {
#[inline]
fn add_intermediate_aggregation_result(
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,
results: &mut IntermediateAggregationResults,
) -> crate::Result<()> {
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();
let intermediate_metric_result = IntermediateMetricResult::Percentiles(self.percentiles);
results.push(
name,
IntermediateAggregationResult::Metric(intermediate_metric_result),
)?;
Ok(())
}
#[inline]
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let field = &agg_with_accessor.aggs.values[self.accessor_idx].accessor;
for val in field.values_for_doc(doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.percentiles.collect(val1);
}
Ok(())
}
#[inline]
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let field = &mut agg_with_accessor.aggs.values[self.accessor_idx];
self.collect_block_with_field(docs, field);
Ok(())
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use more_asserts::{assert_ge, assert_le};
use rand::rngs::StdRng;
use rand::SeedableRng;
use serde_json::Value;
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::agg_result::AggregationResults;
use crate::aggregation::tests::{
get_test_index_from_values, get_test_index_from_values_and_terms,
};
use crate::aggregation::AggregationCollector;
use crate::query::AllQuery;
#[test]
fn test_aggregation_percentiles_empty_index() -> crate::Result<()> {
// test index without segments
let values = vec![];
let index = get_test_index_from_values(false, &values)?;
let agg_req_1: Aggregations = serde_json::from_value(json!({
"percentiles": {
"percentiles": {
"field": "score",
}
},
}))
.unwrap();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
assert_eq!(
res["percentiles"]["values"],
json!({
"1.0": Value::Null,
"5.0": Value::Null,
"25.0": Value::Null,
"50.0": Value::Null,
"75.0": Value::Null,
"95.0": Value::Null,
"99.0": Value::Null,
})
);
Ok(())
}
#[test]
fn test_aggregation_percentile_simple() -> crate::Result<()> {
let values = vec![10.0];
let index = get_test_index_from_values(false, &values)?;
let agg_req_1: Aggregations = serde_json::from_value(json!({
"percentiles": {
"percentiles": {
"field": "score",
}
},
}))
.unwrap();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
let percents = vec!["1.0", "5.0", "25.0", "50.0", "75.0", "95.0", "99.0"];
let range = 9.9..10.1;
for percent in percents {
let val = res["percentiles"]["values"][percent].as_f64().unwrap();
assert!(range.contains(&val));
}
Ok(())
}
#[test]
fn test_aggregation_percentile_parameters() -> crate::Result<()> {
let values = vec![10.0];
let index = get_test_index_from_values(false, &values)?;
let agg_req_str = r#"
{
"mypercentiles": {
"percentiles": {
"field": "score",
"percents": [ 95, 99, 99.9 ]
}
}
} "#;
let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
let percents = vec!["95.0", "99.0", "99.9"];
let expected_range = 9.9..10.1;
for percent in percents {
let val = res["mypercentiles"]["values"][percent].as_f64().unwrap();
assert!(expected_range.contains(&val));
}
// Keyed false
//
let agg_req_str = r#"
{
"mypercentiles": {
"percentiles": {
"field": "score",
"percents": [ 95, 99, 99.9 ],
"keyed": false
}
}
} "#;
let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
let vals = &res["mypercentiles"]["values"];
assert_eq!(vals[0]["key"].as_f64().unwrap(), 95.0);
assert_eq!(vals[1]["key"].as_f64().unwrap(), 99.0);
assert_eq!(vals[2]["key"].as_f64().unwrap(), 99.9);
assert_eq!(vals[3]["key"], serde_json::Value::Null);
assert!(expected_range.contains(&vals[0]["value"].as_f64().unwrap()));
assert!(expected_range.contains(&vals[1]["value"].as_f64().unwrap()));
assert!(expected_range.contains(&vals[2]["value"].as_f64().unwrap()));
Ok(())
}
#[test]
fn test_aggregation_percentiles_single_seg() -> crate::Result<()> {
test_aggregation_percentiles(true)
}
#[test]
fn test_aggregation_percentiles_multi_seg() -> crate::Result<()> {
test_aggregation_percentiles(false)
}
fn test_aggregation_percentiles(merge_segments: bool) -> crate::Result<()> {
use rand_distr::Distribution;
let num_values_in_segment = vec![100, 30_000, 8000];
let lg_norm = rand_distr::LogNormal::new(2.996f64, 0.979f64).unwrap();
let mut rng = StdRng::from_seed([1u8; 32]);
let segment_data = |i| {
(0..num_values_in_segment[i])
.map(|_| lg_norm.sample(&mut rng))
.collect_vec()
};
let values = (0..=2).map(segment_data).collect_vec();
let mut all_values = values
.iter()
.flat_map(|el| el.iter().cloned())
.collect_vec();
all_values.sort_unstable_by(|a, b| a.total_cmp(b));
fn get_exact_quantil(q: f64, all_values: &[f64]) -> f64 {
let q = q / 100.0;
assert!((0f64..=1f64).contains(&q));
let index = (all_values.len() as f64 * q).ceil() as usize;
let index = index.min(all_values.len() - 1);
all_values[index]
}
let segment_and_values = values
.into_iter()
.map(|segment_data| {
segment_data
.into_iter()
.map(|val| (val, val.to_string()))
.collect_vec()
})
.collect_vec();
let index =
get_test_index_from_values_and_terms(merge_segments, &segment_and_values).unwrap();
let reader = index.reader()?;
let agg_req_str = r#"
{
"mypercentiles": {
"percentiles": {
"field": "score_f64",
"percents": [ 95, 99, 99.9 ]
}
}
} "#;
let agg_req_1: Aggregations = serde_json::from_str(agg_req_str).unwrap();
let collector = AggregationCollector::from_aggs(agg_req_1, Default::default());
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
let vals = &res["mypercentiles"]["values"];
let check_quantil = |exact_quantil: f64, val: f64| {
let lower = exact_quantil - exact_quantil * 0.02;
let upper = exact_quantil + exact_quantil * 0.02;
assert_le!(val, upper);
assert_ge!(val, lower);
};
let val = vals["95.0"].as_f64().unwrap();
let exact_quantil = get_exact_quantil(95.0, &all_values);
check_quantil(exact_quantil, val);
let val = vals["99.0"].as_f64().unwrap();
let exact_quantil = get_exact_quantil(99.0, &all_values);
check_quantil(exact_quantil, val);
let val = vals["99.9"].as_f64().unwrap();
let exact_quantil = get_exact_quantil(99.9, &all_values);
check_quantil(exact_quantil, val);
Ok(())
}
}