Add Histogram aggregation

This commit is contained in:
Pascal Seitz
2022-03-09 11:21:14 +01:00
parent 4b62f7907d
commit 226f577803
10 changed files with 1201 additions and 45 deletions

View File

@@ -48,6 +48,7 @@ use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
use super::bucket::HistogramAggregation;
pub use super::bucket::RangeAggregation;
use super::metric::{AverageAggregation, StatsAggregation};
@@ -123,12 +124,18 @@ pub enum BucketAggregationType {
/// Put data into buckets of user-defined ranges.
#[serde(rename = "range")]
Range(RangeAggregation),
/// Put data into buckets of user-defined ranges.
#[serde(rename = "histogram")]
Histogram(HistogramAggregation),
}
impl BucketAggregationType {
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
match self {
BucketAggregationType::Range(range) => fast_field_names.insert(range.field.to_string()),
BucketAggregationType::Histogram(histogram) => {
fast_field_names.insert(histogram.field.to_string())
}
};
}
}

View File

@@ -1,7 +1,7 @@
//! This will enhance the request tree with access to the fastfield and metadata.
use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::RangeAggregation;
use super::bucket::{HistogramAggregation, RangeAggregation};
use super::metric::{AverageAggregation, StatsAggregation};
use super::VecWithNames;
use crate::fastfield::{type_and_cardinality, DynamicFastFieldReader, FastType};
@@ -48,6 +48,9 @@ impl BucketAggregationWithAccessor {
field: field_name,
ranges: _,
}) => get_ff_reader_and_validate(reader, field_name)?,
BucketAggregationType::Histogram(HistogramAggregation {
field: field_name, ..
}) => get_ff_reader_and_validate(reader, field_name)?,
};
let sub_aggregation = sub_aggregation.clone();
Ok(BucketAggregationWithAccessor {

View File

@@ -10,14 +10,15 @@ use std::collections::HashMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use super::bucket::generate_buckets;
use super::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateMetricResult, IntermediateRangeBucketEntry,
IntermediateHistogramBucketEntry, IntermediateMetricResult, IntermediateRangeBucketEntry,
};
use super::metric::{SingleMetricResult, Stats};
use super::Key;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
/// The final aggegation result.
pub struct AggregationResults(pub HashMap<String, AggregationResult>);
@@ -81,12 +82,18 @@ impl From<IntermediateMetricResult> for MetricResult {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BucketResult {
/// This is the default entry for a bucket, which contains a key, count, and optionally
/// This is the range entry for a bucket, which contains a key, count, from, to, and optionally
/// sub_aggregations.
Range {
/// The range buckets sorted by range.
buckets: Vec<RangeBucketEntry>,
},
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
Histogram {
/// The buckets.
buckets: Vec<BucketEntry>,
},
}
impl From<IntermediateBucketResult> for BucketResult {
@@ -106,6 +113,99 @@ impl From<IntermediateBucketResult> for BucketResult {
});
BucketResult::Range { buckets }
}
IntermediateBucketResult::Histogram { buckets, req } => {
let buckets = if req.min_doc_count() == 0 {
// We need to fill up the buckets for the total ranges, so that there are no
// gaps
let max = buckets
.iter()
.map(|bucket| bucket.key)
.fold(f64::NEG_INFINITY, f64::max);
let min = buckets
.iter()
.map(|bucket| bucket.key)
.fold(f64::INFINITY, f64::min);
let all_buckets = if buckets.is_empty() {
vec![]
} else {
generate_buckets(&req, min, max)
};
buckets
.into_iter()
.merge_join_by(all_buckets.into_iter(), |existing_bucket, all_bucket| {
existing_bucket
.key
.partial_cmp(all_bucket)
.unwrap_or(Ordering::Equal)
})
.map(|either| match either {
itertools::EitherOrBoth::Both(existing, _) => existing.into(),
itertools::EitherOrBoth::Left(existing) => existing.into(),
// Add missing bucket
itertools::EitherOrBoth::Right(bucket) => BucketEntry {
key: Key::F64(bucket),
doc_count: 0,
sub_aggregation: Default::default(),
},
})
.collect_vec()
} else {
buckets
.into_iter()
.filter(|bucket| bucket.doc_count >= req.min_doc_count())
.map(|bucket| bucket.into())
.collect_vec()
};
BucketResult::Histogram { buckets }
}
}
}
}
/// This is the default entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
///
/// # JSON Format
/// ```ignore
/// {
/// ...
/// "my_histogram": {
/// "buckets": [
/// {
/// "key": "2.0",
/// "doc_count": 5
/// },
/// {
/// "key": "4.0",
/// "doc_count": 2
/// },
/// {
/// "key": "6.0",
/// "doc_count": 3
/// }
/// ]
/// }
/// ...
/// }
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BucketEntry {
/// The identifier of the bucket.
pub key: Key,
/// Number of documents in the bucket.
pub doc_count: u64,
#[serde(flatten)]
/// sub-aggregations in this bucket.
pub sub_aggregation: AggregationResults,
}
impl From<IntermediateHistogramBucketEntry> for BucketEntry {
fn from(entry: IntermediateHistogramBucketEntry) -> Self {
BucketEntry {
key: Key::F64(entry.key),
doc_count: entry.doc_count,
sub_aggregation: entry.sub_aggregation.into(),
}
}
}
@@ -114,7 +214,7 @@ impl From<IntermediateBucketResult> for BucketResult {
/// sub_aggregations.
///
/// # JSON Format
/// ```json
/// ```ignore
/// {
/// ...
/// "my_ranges": {

View File

@@ -0,0 +1,822 @@
use serde::{Deserialize, Serialize};
use crate::aggregation::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor,
};
use crate::aggregation::f64_from_fastfield_u64;
use crate::aggregation::intermediate_agg_result::IntermediateBucketResult;
use crate::aggregation::segment_agg_result::{
SegmentAggregationResultsCollector, SegmentHistogramBucketEntry,
};
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
use crate::schema::Type;
use crate::{DocId, TantivyError};
/// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`.
/// Each document value is rounded down to its bucket.
///
/// E.g. if we have a price 18 and an interval of 5, the document will fall into the bucket with
/// the key 15. The formula used for this is:
/// `((val - offset) / interval).floor() * interval + offset`
///
/// For this calculation all fastfield values are converted to f64.
///
/// # Returned Buckets
/// By default buckets are returned between the min and max value of the documents, including empty
/// buckets.
/// Setting min_doc_count to != 0 will filter empty buckets.
///
/// The value range of the buckets can bet extended via
/// [extended_bounds](HistogramAggregation::extended_bounds) or set to a predefined range via
/// [hard_bounds](HistogramAggregation::hard_bounds).
///
/// # Result
/// Result type is [BucketResult](crate::aggregation::agg_result::BucketResult) with
/// [RangeBucketEntry](crate::aggregation::agg_result::BucketEntry) on the
/// AggregationCollector.
///
/// Result type is
/// [crate::aggregation::intermediate_agg_result::IntermediateBucketResult] with
/// [crate::aggregation::intermediate_agg_result::IntermediateHistogramBucketEntry] on the
/// DistributedAggregationCollector.
///
/// # Request JSON Format
/// ```ignore
/// {
/// "prices": {
/// "histogram": {
/// "field": "price",
/// "interval": 10,
/// }
/// }
/// }
/// ```
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct HistogramAggregation {
/// The field to aggregate on.
pub field: String,
/// The interval to chunk your data range. The buckets span ranges of [0..interval).
/// Must be a positive value.
pub interval: f64,
/// Intervals intersect at 0 by default, offset can move the interval.
/// Offset has to be in the range [0, interval).
///
/// As an example. If there are two documents with value 8 and 12 and interval 10.0, they would
/// fall into the buckets with the key 0 and 10.
/// With offset 5 and interval 10, they would both fall into the bucket with they key 5 and the
/// range [5..15)
pub offset: Option<f64>,
/// The minimum number of documents in a bucket to be returned. Defaults to 0.
pub min_doc_count: Option<u64>,
/// Sets a hard limit for the data range.
/// This can be used to filter values if they are not in the data range.
pub hard_bounds: Option<HistogramBounds>,
/// Can be set to extend your bounds. The range of the buckets is by default defined by the
/// data range of the values of the documents. As the name suggests, this can only be used to
/// extend the value range. If the bounds for min or max are not extending the range, the value
/// has no effect on the returned buckets.
///
/// Cannot be set in conjunction with min_doc_count > 0, since the empty buckets from extended
/// bounds would not be returned.
pub extended_bounds: Option<HistogramBounds>,
}
impl HistogramAggregation {
fn validate(&self) -> crate::Result<()> {
if self.interval <= 0.0f64 {
return Err(TantivyError::InvalidArgument(
"interval must be a positive value".to_string(),
));
}
if self.min_doc_count.unwrap_or(0) > 0 && self.extended_bounds.is_some() {
return Err(TantivyError::InvalidArgument(
"Cannot set min_doc_count and extended_bounds at the same time".to_string(),
));
}
Ok(())
}
/// Returns the minimum number of documents required for a bucket to be returned.
pub fn min_doc_count(&self) -> u64 {
self.min_doc_count.unwrap_or(0)
}
}
/// Used to set extended or hard bounds on the histogram.
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub struct HistogramBounds {
/// The lower bounds.
pub min: f64,
/// The upper bounds.
pub max: f64,
}
impl HistogramBounds {
fn contains(&self, val: f64) -> bool {
val >= self.min && val <= self.max
}
}
/// The collector puts values from the fast field into the correct buckets and does a conversion to
/// the correct datatype.
#[derive(Clone, Debug, PartialEq)]
pub struct SegmentHistogramCollector {
/// The buckets containing the aggregation data.
buckets: Vec<SegmentHistogramBucketEntry>,
field_type: Type,
req: HistogramAggregation,
offset: f64,
first_bucket_num: i64,
bounds: HistogramBounds,
}
impl SegmentHistogramCollector {
pub fn into_intermediate_bucket_result(self) -> IntermediateBucketResult {
// We cut off the empty buckets at the start and end to mimic elasticsearch
// behaviour
let skip_start = self
.buckets
.iter()
.take_while(|bucket| bucket.doc_count == 0)
.count();
let skip_end = self
.buckets
.iter()
.rev()
.take_while(|bucket| bucket.doc_count == 0)
.count();
let num_buckets = self.buckets.len();
let buckets = self
.buckets
.into_iter()
.skip(skip_start)
.take(num_buckets.saturating_sub(skip_start + skip_end))
.filter(|bucket| bucket.doc_count != 0)
.map(|bucket| bucket.into())
.collect::<Vec<_>>();
IntermediateBucketResult::Histogram {
buckets,
req: self.req,
}
}
pub(crate) fn from_req_and_validate(
req: &HistogramAggregation,
sub_aggregation: &AggregationsWithAccessor,
field_type: Type,
accessor: &DynamicFastFieldReader<u64>,
) -> crate::Result<Self> {
req.validate()?;
let min = f64_from_fastfield_u64(accessor.min_value(), &field_type);
let max = f64_from_fastfield_u64(accessor.max_value(), &field_type);
let (min, max) = get_req_min_max(req, min, max);
// We compute and generate the buckets range (min, max) based on the request and the min
// max in the fast field, but this is likely not ideal when this is a subbucket, where many
// unnecessary buckets may be generated.
let buckets = generate_buckets(req, min, max);
let sub_aggregation = if sub_aggregation.is_empty() {
None
} else {
Some(SegmentAggregationResultsCollector::from_req_and_validate(
sub_aggregation,
)?)
};
let buckets = buckets
.iter()
.map(|bucket| SegmentHistogramBucketEntry {
key: *bucket,
doc_count: 0,
sub_aggregation: sub_aggregation.clone(),
})
.collect();
let (min, _) = get_req_min_max(req, min, max);
let first_bucket_num =
get_bucket_num_f64(min, req.interval, req.offset.unwrap_or(0.0)) as i64;
let bounds = req.hard_bounds.clone().unwrap_or(HistogramBounds {
min: f64::MIN,
max: f64::MAX,
});
Ok(Self {
buckets,
field_type,
req: req.clone(),
offset: req.offset.unwrap_or(0f64),
first_bucket_num,
bounds,
})
}
#[inline]
pub(crate) fn collect_block(
&mut self,
doc: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
force_flush: bool,
) {
let bounds = self.bounds;
let interval = self.req.interval;
let offset = self.offset;
let first_bucket_num = self.first_bucket_num;
let get_bucket_num =
|val| (get_bucket_num_f64(val, interval, offset) as i64 - first_bucket_num) as usize;
let mut iter = doc.chunks_exact(4);
for docs in iter.by_ref() {
let val0 = self.f64_from_fastfield_u64(bucket_with_accessor.accessor.get(docs[0]));
let val1 = self.f64_from_fastfield_u64(bucket_with_accessor.accessor.get(docs[1]));
let val2 = self.f64_from_fastfield_u64(bucket_with_accessor.accessor.get(docs[2]));
let val3 = self.f64_from_fastfield_u64(bucket_with_accessor.accessor.get(docs[3]));
let bucket_pos0 = get_bucket_num(val0);
let bucket_pos1 = get_bucket_num(val1);
let bucket_pos2 = get_bucket_num(val2);
let bucket_pos3 = get_bucket_num(val3);
self.increment_bucket_if_in_bounds(
val0,
&bounds,
bucket_pos0,
docs[0],
&bucket_with_accessor.sub_aggregation,
);
self.increment_bucket_if_in_bounds(
val1,
&bounds,
bucket_pos1,
docs[1],
&bucket_with_accessor.sub_aggregation,
);
self.increment_bucket_if_in_bounds(
val2,
&bounds,
bucket_pos2,
docs[2],
&bucket_with_accessor.sub_aggregation,
);
self.increment_bucket_if_in_bounds(
val3,
&bounds,
bucket_pos3,
docs[3],
&bucket_with_accessor.sub_aggregation,
);
}
for doc in iter.remainder() {
let val =
f64_from_fastfield_u64(bucket_with_accessor.accessor.get(*doc), &self.field_type);
if !bounds.contains(val) {
continue;
}
let bucket_pos = (get_bucket_num_f64(val, self.req.interval, self.offset) as i64
- self.first_bucket_num) as usize;
debug_assert_eq!(
self.buckets[bucket_pos].key,
get_bucket_val(val, self.req.interval, self.offset) as f64
);
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation);
}
if force_flush {
for bucket in &mut self.buckets {
if let Some(sub_aggregation) = &mut bucket.sub_aggregation {
sub_aggregation
.flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush);
}
}
}
}
#[inline]
fn increment_bucket_if_in_bounds(
&mut self,
val: f64,
bounds: &HistogramBounds,
bucket_pos: usize,
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
) {
if bounds.contains(val) {
debug_assert_eq!(
self.buckets[bucket_pos].key,
get_bucket_val(val, self.req.interval, self.offset) as f64
);
self.increment_bucket(bucket_pos, doc, &bucket_with_accessor);
}
}
#[inline]
fn increment_bucket(
&mut self,
bucket_pos: usize,
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
) {
let bucket = &mut self.buckets[bucket_pos];
bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.sub_aggregation {
sub_aggregation.collect(doc, bucket_with_accessor);
}
}
fn f64_from_fastfield_u64(&self, val: u64) -> f64 {
f64_from_fastfield_u64(val, &self.field_type)
}
}
#[inline]
fn get_bucket_num_f64(val: f64, interval: f64, offset: f64) -> f64 {
((val - offset) / interval).floor()
}
#[inline]
fn get_bucket_val(val: f64, interval: f64, offset: f64) -> f64 {
let bucket_pos = get_bucket_num_f64(val, interval, offset);
bucket_pos * interval + offset
}
fn get_req_min_max(req: &HistogramAggregation, mut min: f64, mut max: f64) -> (f64, f64) {
if let Some(extended_bounds) = &req.extended_bounds {
min = min.min(extended_bounds.min);
max = max.max(extended_bounds.max);
}
if let Some(hard_bounds) = &req.hard_bounds {
min = hard_bounds.min;
max = hard_bounds.max;
}
(min, max)
}
/// Generates buckets with req.interval, for given min, max
pub(crate) fn generate_buckets(req: &HistogramAggregation, min: f64, max: f64) -> Vec<f64> {
let (min, max) = get_req_min_max(req, min, max);
let offset = req.offset.unwrap_or(0.0);
let first_bucket_num = get_bucket_num_f64(min, req.interval, offset) as i64;
let last_bucket_num = get_bucket_num_f64(max, req.interval, offset) as i64;
let mut buckets = vec![];
for bucket_pos in first_bucket_num..=last_bucket_num {
let bucket_key = bucket_pos as f64 * req.interval + offset;
buckets.push(bucket_key);
}
buckets
}
#[test]
fn generate_buckets_test() {
let histogram_req = HistogramAggregation {
field: "dummy".to_string(),
interval: 2.0,
..Default::default()
};
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
let buckets = generate_buckets(&histogram_req, 2.5, 5.5);
assert_eq!(buckets, vec![2.0, 4.0]);
// Single bucket
let buckets = generate_buckets(&histogram_req, 0.5, 0.75);
assert_eq!(buckets, vec![0.0]);
// With offset
let histogram_req = HistogramAggregation {
field: "dummy".to_string(),
interval: 2.0,
offset: Some(0.5),
..Default::default()
};
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
assert_eq!(buckets, vec![-1.5, 0.5, 2.5, 4.5, 6.5, 8.5]);
let buckets = generate_buckets(&histogram_req, 2.5, 5.5);
assert_eq!(buckets, vec![2.5, 4.5]);
// Single bucket
let buckets = generate_buckets(&histogram_req, 0.5, 0.75);
assert_eq!(buckets, vec![0.5]);
// With extended_bounds
let histogram_req = HistogramAggregation {
field: "dummy".to_string(),
interval: 2.0,
extended_bounds: Some(HistogramBounds {
min: 0.0,
max: 10.0,
}),
..Default::default()
};
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
let buckets = generate_buckets(&histogram_req, 2.5, 5.5);
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
// Single bucket, but extended_bounds
let buckets = generate_buckets(&histogram_req, 0.5, 0.75);
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
// With invalid extended_bounds
let histogram_req = HistogramAggregation {
field: "dummy".to_string(),
interval: 2.0,
extended_bounds: Some(HistogramBounds { min: 3.0, max: 5.0 }),
..Default::default()
};
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
// With hard_bounds reducing
let histogram_req = HistogramAggregation {
field: "dummy".to_string(),
interval: 2.0,
hard_bounds: Some(HistogramBounds { min: 3.0, max: 5.0 }),
..Default::default()
};
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
assert_eq!(buckets, vec![2.0, 4.0]);
// With hard_bounds extending
let histogram_req = HistogramAggregation {
field: "dummy".to_string(),
interval: 2.0,
hard_bounds: Some(HistogramBounds {
min: 0.0,
max: 10.0,
}),
..Default::default()
};
let buckets = generate_buckets(&histogram_req, 2.5, 5.5);
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
// Blubber
let histogram_req = HistogramAggregation {
field: "dummy".to_string(),
interval: 2.0,
..Default::default()
};
let buckets = generate_buckets(&histogram_req, 4.0, 10.0);
assert_eq!(buckets, vec![4.0, 6.0, 8.0, 10.0]);
}
#[cfg(test)]
mod tests {
use serde_json::Value;
use super::*;
use crate::aggregation::agg_req::{
Aggregation, Aggregations, BucketAggregation, BucketAggregationType,
};
use crate::aggregation::tests::{
get_test_index_2_segments, get_test_index_from_values, get_test_index_with_num_docs,
};
use crate::aggregation::AggregationCollector;
use crate::query::{AllQuery, TermQuery};
use crate::schema::IndexRecordOption;
use crate::{Index, Term};
fn exec_request(agg_req: Aggregations, index: &Index) -> crate::Result<Value> {
let collector = AggregationCollector::from_aggs(agg_req);
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
Ok(res)
}
#[test]
fn histogram_test_crooked_values() -> crate::Result<()> {
let values = vec![-12.0, 12.31, 14.33, 16.23];
let index = get_test_index_from_values(false, &values)?;
let agg_req: Aggregations = vec![(
"my_interval".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 3.5,
offset: Some(0.0),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_interval"]["buckets"][0]["key"], -14.0);
assert_eq!(res["my_interval"]["buckets"][0]["doc_count"], 1);
assert_eq!(res["my_interval"]["buckets"][7]["key"], 10.5);
assert_eq!(res["my_interval"]["buckets"][7]["doc_count"], 1);
assert_eq!(res["my_interval"]["buckets"][8]["key"], 14.0);
assert_eq!(res["my_interval"]["buckets"][8]["doc_count"], 2);
assert_eq!(res["my_interval"]["buckets"][9], Value::Null);
// With offset
let agg_req: Aggregations = vec![(
"my_interval".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 3.5,
offset: Some(1.2),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_interval"]["buckets"][0]["key"], -12.8);
assert_eq!(res["my_interval"]["buckets"][0]["doc_count"], 1);
assert_eq!(res["my_interval"]["buckets"][1]["key"], -9.3);
assert_eq!(res["my_interval"]["buckets"][1]["doc_count"], 0);
assert_eq!(res["my_interval"]["buckets"][2]["key"], -5.8);
assert_eq!(res["my_interval"]["buckets"][2]["doc_count"], 0);
assert_eq!(res["my_interval"]["buckets"][3]["key"], -2.3);
assert_eq!(res["my_interval"]["buckets"][3]["doc_count"], 0);
assert_eq!(res["my_interval"]["buckets"][7]["key"], 11.7);
assert_eq!(res["my_interval"]["buckets"][7]["doc_count"], 2);
assert_eq!(res["my_interval"]["buckets"][8]["key"], 15.2);
assert_eq!(res["my_interval"]["buckets"][8]["doc_count"], 1);
assert_eq!(res["my_interval"]["buckets"][9], Value::Null);
Ok(())
}
#[test]
fn histogram_test_min_value_positive_force_merge_segments() -> crate::Result<()> {
histogram_test_min_value_positive_merge_segments(true)
}
#[test]
fn histogram_test_min_value_positive() -> crate::Result<()> {
histogram_test_min_value_positive_merge_segments(false)
}
fn histogram_test_min_value_positive_merge_segments(merge_segments: bool) -> crate::Result<()> {
let values = vec![10.0, 12.0, 14.0, 16.23];
let index = get_test_index_from_values(merge_segments, &values)?;
let agg_req: Aggregations = vec![(
"my_interval".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 1.0,
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_interval"]["buckets"][0]["key"], 10.0);
assert_eq!(res["my_interval"]["buckets"][0]["doc_count"], 1);
assert_eq!(res["my_interval"]["buckets"][1]["key"], 11.0);
assert_eq!(res["my_interval"]["buckets"][1]["doc_count"], 0);
assert_eq!(res["my_interval"]["buckets"][2]["key"], 12.0);
assert_eq!(res["my_interval"]["buckets"][2]["doc_count"], 1);
assert_eq!(res["my_interval"]["buckets"][3]["key"], 13.0);
assert_eq!(res["my_interval"]["buckets"][3]["doc_count"], 0);
assert_eq!(res["my_interval"]["buckets"][6]["key"], 16.0);
assert_eq!(res["my_interval"]["buckets"][6]["doc_count"], 1);
assert_eq!(res["my_interval"]["buckets"][7], Value::Null);
Ok(())
}
#[test]
fn histogram_simple_test() -> crate::Result<()> {
let index = get_test_index_with_num_docs(false, 100)?;
let agg_req: Aggregations = vec![(
"histogram".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 1.0,
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["histogram"]["buckets"][0]["key"], 0.0);
assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 1);
assert_eq!(res["histogram"]["buckets"][1]["key"], 1.0);
assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 1);
assert_eq!(res["histogram"]["buckets"][99]["key"], 99.0);
assert_eq!(res["histogram"]["buckets"][99]["doc_count"], 1);
assert_eq!(res["histogram"]["buckets"][100], Value::Null);
Ok(())
}
#[test]
fn histogram_merge_test() -> crate::Result<()> {
// Merge buckets counts from different segments
let values = vec![10.0, 12.0, 14.0, 16.23, 10.0, 13.0, 10.0, 12.0];
let index = get_test_index_from_values(false, &values)?;
let agg_req: Aggregations = vec![(
"histogram".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 1.0,
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["histogram"]["buckets"][0]["key"], 10.0);
assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 3);
assert_eq!(res["histogram"]["buckets"][1]["key"], 11.0);
assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0);
assert_eq!(res["histogram"]["buckets"][2]["key"], 12.0);
assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 2);
assert_eq!(res["histogram"]["buckets"][3]["key"], 13.0);
assert_eq!(res["histogram"]["buckets"][3]["doc_count"], 1);
Ok(())
}
#[test]
fn histogram_min_doc_test_multi_segments() -> crate::Result<()> {
histogram_min_doc_test_with_opt(false)
}
#[test]
fn histogram_min_doc_test_single_segments() -> crate::Result<()> {
histogram_min_doc_test_with_opt(true)
}
fn histogram_min_doc_test_with_opt(merge_segments: bool) -> crate::Result<()> {
let values = vec![10.0, 12.0, 14.0, 16.23, 10.0, 13.0, 10.0, 12.0];
let index = get_test_index_from_values(merge_segments, &values)?;
let agg_req: Aggregations = vec![(
"histogram".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 1.0,
min_doc_count: Some(2),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["histogram"]["buckets"][0]["key"], 10.0);
assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 3);
assert_eq!(res["histogram"]["buckets"][1]["key"], 12.0);
assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 2);
assert_eq!(res["histogram"]["buckets"][2], Value::Null);
Ok(())
}
#[test]
fn histogram_hard_bounds_test_multi_segment() -> crate::Result<()> {
histogram_hard_bounds_test_with_opt(false)
}
#[test]
fn histogram_hard_bounds_test_single_segment() -> crate::Result<()> {
histogram_hard_bounds_test_with_opt(true)
}
fn histogram_hard_bounds_test_with_opt(merge_segments: bool) -> crate::Result<()> {
let values = vec![10.0, 12.0, 14.0, 16.23, 10.0, 13.0, 10.0, 12.0];
let index = get_test_index_from_values(merge_segments, &values)?;
let agg_req: Aggregations = vec![(
"histogram".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 1.0,
hard_bounds: Some(HistogramBounds {
min: 2.0,
max: 12.0,
}),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["histogram"]["buckets"][0]["key"], 2.0);
assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0);
assert_eq!(res["histogram"]["buckets"][10]["key"], 12.0);
assert_eq!(res["histogram"]["buckets"][10]["doc_count"], 2);
assert_eq!(res["histogram"]["buckets"][11], Value::Null);
Ok(())
}
#[test]
fn histogram_empty_bucket_behaviour_test_single_segment() -> crate::Result<()> {
histogram_empty_bucket_behaviour_test_with_opt(true)
}
#[test]
fn histogram_empty_bucket_behaviour_test_multi_segment() -> crate::Result<()> {
histogram_empty_bucket_behaviour_test_with_opt(false)
}
fn histogram_empty_bucket_behaviour_test_with_opt(merge_segments: bool) -> crate::Result<()> {
let index = get_test_index_2_segments(merge_segments)?;
let agg_req: Aggregations = vec![(
"histogram".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 1.0,
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
// let res = exec_request(agg_req, &index)?;
let reader = index.reader()?;
let text_field = reader.searcher().schema().get_field("text").unwrap();
let term_query = TermQuery::new(
Term::from_field_text(text_field, "nohit"),
IndexRecordOption::Basic,
);
let collector = AggregationCollector::from_aggs(agg_req);
let searcher = reader.searcher();
let agg_res = searcher.search(&term_query, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
println!("{}", &serde_json::to_string_pretty(&agg_res)?);
assert_eq!(res["histogram"]["buckets"][0]["key"], 6.0);
assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 1);
assert_eq!(res["histogram"]["buckets"][37]["key"], 43.0);
assert_eq!(res["histogram"]["buckets"][37]["doc_count"], 0);
assert_eq!(res["histogram"]["buckets"][38]["key"], 44.0);
assert_eq!(res["histogram"]["buckets"][38]["doc_count"], 1);
assert_eq!(res["histogram"]["buckets"][39], Value::Null);
Ok(())
}
}

View File

@@ -0,0 +1,2 @@
mod histogram;
pub use histogram::*;

View File

@@ -7,7 +7,10 @@
//! Results of intermediate buckets are
//! [IntermediateBucketResult](super::intermediate_agg_result::IntermediateBucketResult)
mod histogram;
mod range;
pub(crate) use histogram::SegmentHistogramCollector;
pub use histogram::*;
pub(crate) use range::SegmentRangeCollector;
pub use range::*;

View File

@@ -31,7 +31,7 @@ use crate::{DocId, TantivyError};
/// DistributedAggregationCollector.
///
/// # Request JSON Format
/// ```json
/// ```ignore
/// {
/// "range": {
/// "field": "score",

View File

@@ -2,16 +2,19 @@
//! Intermediate aggregation results can be used to merge results between segments or between
//! indices.
use std::collections::HashMap;
use std::cmp::Ordering;
use fnv::FnvHashMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use super::bucket::HistogramAggregation;
use super::metric::{IntermediateAverage, IntermediateStats};
use super::segment_agg_result::{
SegmentAggregationResultsCollector, SegmentBucketResultCollector, SegmentMetricResultCollector,
SegmentRangeBucketEntry,
SegmentAggregationResultsCollector, SegmentBucketResultCollector, SegmentHistogramBucketEntry,
SegmentMetricResultCollector, SegmentRangeBucketEntry,
};
use super::{Key, SerializedKey, VecWithNames};
use super::{Key, MergeFruits, SerializedKey, VecWithNames};
/// Contains the intermediate aggregation result, which is optimized to be merged with other
/// intermediate results.
@@ -124,13 +127,25 @@ impl IntermediateMetricResult {
pub enum IntermediateBucketResult {
/// This is the range entry for a bucket, which contains a key, count, from, to, and optionally
/// sub_aggregations.
Range(HashMap<SerializedKey, IntermediateRangeBucketEntry>),
Range(FnvHashMap<SerializedKey, IntermediateRangeBucketEntry>),
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
Histogram {
/// The buckets
buckets: Vec<IntermediateHistogramBucketEntry>,
/// The original request. It is used to compute the total range after merging segments and
/// get min_doc_count after merging all segment results.
req: HistogramAggregation,
},
}
impl From<SegmentBucketResultCollector> for IntermediateBucketResult {
fn from(collector: SegmentBucketResultCollector) -> Self {
match collector {
SegmentBucketResultCollector::Range(range) => range.into_intermediate_bucket_result(),
SegmentBucketResultCollector::Histogram(histogram) => {
histogram.into_intermediate_bucket_result()
}
}
}
}
@@ -142,18 +157,92 @@ impl IntermediateBucketResult {
IntermediateBucketResult::Range(entries_left),
IntermediateBucketResult::Range(entries_right),
) => {
for (name, entry_left) in entries_left.iter_mut() {
if let Some(entry_right) = entries_right.get(name) {
entry_left.merge_fruits(entry_right);
}
}
for (key, res) in entries_right.iter() {
if !entries_left.contains_key(key) {
entries_left.insert(key.clone(), res.clone());
}
}
merge_maps(entries_left, entries_right);
}
(
IntermediateBucketResult::Histogram {
buckets: entries_left,
..
},
IntermediateBucketResult::Histogram {
buckets: entries_right,
..
},
) => {
let mut buckets = entries_left
.drain(..)
.merge_join_by(entries_right.iter(), |left, right| {
left.key.partial_cmp(&right.key).unwrap_or(Ordering::Equal)
})
.map(|either| match either {
itertools::EitherOrBoth::Both(mut left, right) => {
left.merge_fruits(right);
left
}
itertools::EitherOrBoth::Left(left) => left,
itertools::EitherOrBoth::Right(right) => right.clone(),
})
.collect();
std::mem::swap(entries_left, &mut buckets);
}
(IntermediateBucketResult::Range(_), _) => {
panic!("try merge on different types")
}
(IntermediateBucketResult::Histogram { .. }, _) => {
panic!("try merge on different types")
}
}
}
}
// fn merge_sorted_vecs<V: MergeFruits + Clone>(entries_left: &mut Vec<V>, entries_right: &Vec<V>) {
// for el in entries_left
//.iter_mut()
//.merge_join_by(entries_right.iter(), |left, right| left.key.cmp(right.key))
//{}
//}
fn merge_maps<V: MergeFruits + Clone>(
entries_left: &mut FnvHashMap<SerializedKey, V>,
entries_right: &FnvHashMap<SerializedKey, V>,
) {
for (name, entry_left) in entries_left.iter_mut() {
if let Some(entry_right) = entries_right.get(name) {
entry_left.merge_fruits(entry_right);
}
}
for (key, res) in entries_right.iter() {
if !entries_left.contains_key(key) {
entries_left.insert(key.clone(), res.clone());
}
}
}
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct IntermediateHistogramBucketEntry {
/// The unique the bucket is identified.
pub key: f64,
/// The number of documents in the bucket.
pub doc_count: u64,
/// The sub_aggregation in this bucket.
pub sub_aggregation: IntermediateAggregationResults,
}
impl From<SegmentHistogramBucketEntry> for IntermediateHistogramBucketEntry {
fn from(entry: SegmentHistogramBucketEntry) -> Self {
let sub_aggregation = if let Some(sub_aggregation) = entry.sub_aggregation {
sub_aggregation.into()
} else {
Default::default()
};
IntermediateHistogramBucketEntry {
key: entry.key,
doc_count: entry.doc_count,
sub_aggregation,
}
}
}
@@ -184,7 +273,6 @@ impl From<SegmentRangeBucketEntry> for IntermediateRangeBucketEntry {
} else {
Default::default()
};
// let sub_aggregation = entry.sub_aggregation.into();
IntermediateRangeBucketEntry {
key: entry.key,
@@ -197,22 +285,31 @@ impl From<SegmentRangeBucketEntry> for IntermediateRangeBucketEntry {
}
}
impl IntermediateRangeBucketEntry {
impl MergeFruits for IntermediateRangeBucketEntry {
fn merge_fruits(&mut self, other: &IntermediateRangeBucketEntry) {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(&other.sub_aggregation);
}
}
impl MergeFruits for IntermediateHistogramBucketEntry {
fn merge_fruits(&mut self, other: &IntermediateHistogramBucketEntry) {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(&other.sub_aggregation);
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use pretty_assertions::assert_eq;
use super::*;
fn get_sub_test_tree(data: &[(String, u64)]) -> IntermediateAggregationResults {
let mut map = HashMap::new();
let mut buckets = HashMap::new();
let mut buckets = FnvHashMap::default();
for (key, doc_count) in data {
buckets.insert(
key.to_string(),
@@ -235,7 +332,7 @@ mod tests {
fn get_test_tree(data: &[(String, u64, String, u64)]) -> IntermediateAggregationResults {
let mut map = HashMap::new();
let mut buckets = HashMap::new();
let mut buckets: FnvHashMap<_, _> = Default::default();
for (key, doc_count, sub_aggregation_key, sub_aggregation_count) in data {
buckets.insert(
key.to_string(),

View File

@@ -18,6 +18,10 @@
//! Create an [AggregationCollector] from this request. AggregationCollector implements the
//! `Collector` trait and can be passed as collector into `searcher.search()`.
//!
//! #### Limitations
//!
//! Currently aggregations work only on single value fast fields of type u64, f64 and i64.
//!
//! # JSON Format
//! Aggregations request and result structures de/serialize into elasticsearch compatible JSON.
//!
@@ -28,9 +32,14 @@
//! let agg_res = searcher.search(&term_query, &collector).unwrap_err();
//! let json_response_string: String = &serde_json::to_string(&agg_res)?;
//! ```
//! # Limitations
//!
//! Currently aggregations work only on single value fast fields of type u64, f64 and i64.
//! # Supported Aggregations
//! - [Bucket](bucket)
//! - [Histogram](bucket::HistogramAggregation)
//! - [Range](bucket::RangeAggregation)
//! - [Metric](metric)
//! - [Average](metric::AverageAggregation)
//! - [Stats](metric::StatsAggregation)
//!
//! # Example
//! Compute the average metric, by building [agg_req::Aggregations], which is built from an (String,
@@ -90,7 +99,8 @@
//! }
//! }
//! "#;
//! let agg_req: Aggregations = serde_json::from_str(elasticsearch_compatible_json_req).unwrap();
//! let agg_req: Aggregations =
//! serde_json::from_str(elasticsearch_compatible_json_req).unwrap();
//! ```
//! # Code Organization
//!
@@ -101,8 +111,7 @@
//! Buckets can contain sub-aggregations. In this example we create buckets with the range
//! aggregation and then calculate the average on each bucket.
//! ```
//! use tantivy::aggregation::agg_req::{Aggregations, Aggregation, BucketAggregation,
//! MetricAggregation, BucketAggregationType};
//! use tantivy::aggregation::agg_req::*;
//! use tantivy::aggregation::metric::AverageAggregation;
//! use tantivy::aggregation::bucket::RangeAggregation;
//! let sub_agg_req_1: Aggregations = vec![(
@@ -239,6 +248,10 @@ pub enum Key {
F64(f64),
}
trait MergeFruits {
fn merge_fruits(&mut self, other: &Self);
}
impl Display for Key {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@@ -310,6 +323,16 @@ mod tests {
pub fn get_test_index_with_num_docs(
merge_segments: bool,
num_docs: usize,
) -> crate::Result<Index> {
get_test_index_from_values(
merge_segments,
&(0..num_docs).map(|el| el as f64).collect::<Vec<f64>>(),
)
}
pub fn get_test_index_from_values(
merge_segments: bool,
values: &[f64],
) -> crate::Result<Index> {
let mut schema_builder = Schema::builder();
let text_fieldtype = crate::schema::TextOptions::default()
@@ -332,18 +355,17 @@ mod tests {
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_for_tests()?;
for i in 0..num_docs {
for i in values {
// writing the segment
index_writer.add_document(doc!(
text_field => "cool",
score_field => i as u64,
score_field_f64 => i as f64,
score_field_i64 => i as i64,
fraction_field => i as f64/100.0,
score_field => *i as u64,
score_field_f64 => *i as f64,
score_field_i64 => *i as i64,
fraction_field => *i as f64/100.0,
))?;
index_writer.commit()?;
}
index_writer.commit()?;
}
if merge_segments {
let segment_ids = index
@@ -385,27 +407,42 @@ mod tests {
// A second bucket on the first level should have the cache unfilled
// let elasticsearch_compatible_json_req = r#"
let elasticsearch_compatible_json_req = r#"
let elasticsearch_compatible_json = json!(
{
"bucketsL1": {
"range": {
"field": "score",
"ranges": [ { "to": 3.0 }, { "from": 3.0, "to": 266.0 }, { "from": 266.0 } ]
"ranges": [ { "to": 3.0f64 }, { "from": 3.0f64, "to": 266.0f64 }, { "from": 266.0f64 } ]
},
"aggs": {
"bucketsL2": {
"range": {
"field": "score",
"ranges": [ { "to": 100.0 }, { "from": 100.0, "to": 266.0 }, { "from": 266.0 } ]
"ranges": [ { "to": 100.0f64 }, { "from": 100.0f64, "to": 266.0f64 }, { "from": 266.0f64 } ]
}
}
}
},
"histogram_test":{
"histogram": {
"field": "score",
"interval": 263.0,
"offset": 3.0,
},
"aggs": {
"bucketsL2": {
"histogram": {
"field": "score",
"interval": 263.0
}
}
}
}
}
"#;
});
let agg_req: Aggregations =
serde_json::from_str(elasticsearch_compatible_json_req).unwrap();
serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap())
.unwrap();
let agg_res: AggregationResults = if use_distributed_collector {
let collector = DistributedAggregationCollector::from_aggs(agg_req);
@@ -950,6 +987,7 @@ mod tests {
use test::{self, Bencher};
use super::*;
use crate::aggregation::bucket::{HistogramAggregation, HistogramBounds};
use crate::aggregation::metric::StatsAggregation;
use crate::query::AllQuery;
@@ -1165,6 +1203,71 @@ mod tests {
});
}
// hard bounds has a different algorithm, because it actually limits collection range
#[bench]
fn bench_aggregation_histogram_only_hard_bounds(b: &mut Bencher) {
let index = get_test_index_bench(false).unwrap();
let reader = index.reader().unwrap();
b.iter(|| {
let agg_req_1: Aggregations = vec![(
"rangef64".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 100f64,
hard_bounds: Some(HistogramBounds {
min: 1000.0,
max: 300_000.0,
}),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let collector = AggregationCollector::from_aggs(agg_req_1);
let searcher = reader.searcher();
let agg_res: AggregationResults =
searcher.search(&AllQuery, &collector).unwrap().into();
agg_res
});
}
#[bench]
fn bench_aggregation_histogram_only(b: &mut Bencher) {
let index = get_test_index_bench(false).unwrap();
let reader = index.reader().unwrap();
b.iter(|| {
let agg_req_1: Aggregations = vec![(
"rangef64".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 100f64, // 1000 buckets
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let collector = AggregationCollector::from_aggs(agg_req_1);
let searcher = reader.searcher();
let agg_res: AggregationResults =
searcher.search(&AllQuery, &collector).unwrap().into();
agg_res
});
}
#[bench]
fn bench_aggregation_sub_tree(b: &mut Bencher) {
let index = get_test_index_bench(false).unwrap();

View File

@@ -9,7 +9,7 @@ use super::agg_req::MetricAggregation;
use super::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor, MetricAggregationWithAccessor,
};
use super::bucket::SegmentRangeCollector;
use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector};
use super::metric::{
AverageAggregation, SegmentAverageCollector, SegmentStatsCollector, StatsAggregation,
};
@@ -151,6 +151,7 @@ impl SegmentMetricResultCollector {
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum SegmentBucketResultCollector {
Range(SegmentRangeCollector),
Histogram(SegmentHistogramCollector),
}
impl SegmentBucketResultCollector {
@@ -163,6 +164,14 @@ impl SegmentBucketResultCollector {
req.field_type,
)?))
}
BucketAggregationType::Histogram(histogram) => Ok(Self::Histogram(
SegmentHistogramCollector::from_req_and_validate(
histogram,
&req.sub_aggregation,
req.field_type,
&req.accessor,
)?,
)),
}
}
@@ -177,10 +186,20 @@ impl SegmentBucketResultCollector {
SegmentBucketResultCollector::Range(range) => {
range.collect_block(doc, bucket_with_accessor, force_flush);
}
SegmentBucketResultCollector::Histogram(histogram) => {
histogram.collect_block(doc, bucket_with_accessor, force_flush)
}
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct SegmentHistogramBucketEntry {
pub key: f64,
pub doc_count: u64,
pub sub_aggregation: Option<SegmentAggregationResultsCollector>,
}
#[derive(Clone, PartialEq)]
pub(crate) struct SegmentRangeBucketEntry {
pub key: Key,