mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 09:12:55 +00:00
switch to sparse collection for histogram (#1898)
* switch to sparse collection for histogram Replaces histogram vec collection with a hashmap. This approach works much better for sparse data and enables use cases like drill downs (filter + small interval). It is slower for dense cases (1.3x-2x slower). This can be alleviated with a specialized hashmap in the future. closes #1704 closes #1370 * refactor, clippy * fix bucket_pos overflow issue
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::Display;
|
||||
|
||||
use columnar::{Column, ColumnType};
|
||||
use columnar::ColumnType;
|
||||
use itertools::Itertools;
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tantivy_bitpacker::minmax;
|
||||
|
||||
use crate::aggregation::agg_req::AggregationsInternal;
|
||||
use crate::aggregation::agg_req_with_accessor::{
|
||||
@@ -175,7 +177,7 @@ impl HistogramBounds {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
#[derive(Default, Clone, Debug, PartialEq)]
|
||||
pub(crate) struct SegmentHistogramBucketEntry {
|
||||
pub key: f64,
|
||||
pub doc_count: u64,
|
||||
@@ -201,13 +203,12 @@ impl SegmentHistogramBucketEntry {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SegmentHistogramCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
buckets: Vec<SegmentHistogramBucketEntry>,
|
||||
sub_aggregations: Option<Vec<Box<dyn SegmentAggregationCollector>>>,
|
||||
buckets: FxHashMap<i64, SegmentHistogramBucketEntry>,
|
||||
sub_aggregations: FxHashMap<i64, Box<dyn SegmentAggregationCollector>>,
|
||||
sub_aggregation_blueprint: Option<Box<dyn SegmentAggregationCollector>>,
|
||||
column_type: ColumnType,
|
||||
interval: f64,
|
||||
offset: f64,
|
||||
min_doc_count: u64,
|
||||
first_bucket_num: i64,
|
||||
bounds: HistogramBounds,
|
||||
accessor_idx: usize,
|
||||
}
|
||||
@@ -249,22 +250,23 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
|
||||
let bounds = self.bounds;
|
||||
let interval = self.interval;
|
||||
let offset = self.offset;
|
||||
let first_bucket_num = self.first_bucket_num;
|
||||
let get_bucket_num =
|
||||
|val| (get_bucket_num_f64(val, interval, offset) as i64 - first_bucket_num) as usize;
|
||||
let get_bucket_pos = |val| (get_bucket_pos_f64(val, interval, offset) as i64);
|
||||
|
||||
for doc in docs {
|
||||
for val in accessor.values(*doc) {
|
||||
let val = self.f64_from_fastfield_u64(val);
|
||||
|
||||
let bucket_pos = get_bucket_num(val);
|
||||
self.increment_bucket_if_in_bounds(
|
||||
val,
|
||||
&bounds,
|
||||
bucket_pos,
|
||||
*doc,
|
||||
sub_aggregation_accessor,
|
||||
)?;
|
||||
let bucket_pos = get_bucket_pos(val);
|
||||
|
||||
if bounds.contains(val) {
|
||||
self.increment_bucket(
|
||||
bucket_pos,
|
||||
*doc,
|
||||
sub_aggregation_accessor,
|
||||
interval,
|
||||
offset,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -274,10 +276,8 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
|
||||
let sub_aggregation_accessor =
|
||||
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
|
||||
|
||||
if let Some(sub_aggregations) = self.sub_aggregations.as_mut() {
|
||||
for sub_aggregation in sub_aggregations {
|
||||
sub_aggregation.flush(sub_aggregation_accessor)?;
|
||||
}
|
||||
for sub_aggregation in self.sub_aggregations.values_mut() {
|
||||
sub_aggregation.flush(sub_aggregation_accessor)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -289,65 +289,21 @@ impl SegmentHistogramCollector {
|
||||
self,
|
||||
agg_with_accessor: &BucketAggregationWithAccessor,
|
||||
) -> crate::Result<IntermediateBucketResult> {
|
||||
// Compute the number of buckets to validate against max num buckets
|
||||
// Note: We use min_doc_count here, but it's only an lowerbound here, since were are on the
|
||||
// intermediate level and after merging the number of documents of a bucket could exceed
|
||||
// `min_doc_count`.
|
||||
{
|
||||
let cut_off_buckets_front = self
|
||||
.buckets
|
||||
.iter()
|
||||
.take_while(|bucket| bucket.doc_count <= self.min_doc_count)
|
||||
.count();
|
||||
let cut_off_buckets_back = self.buckets[cut_off_buckets_front..]
|
||||
.iter()
|
||||
.rev()
|
||||
.take_while(|bucket| bucket.doc_count <= self.min_doc_count)
|
||||
.count();
|
||||
let estimate_num_buckets =
|
||||
self.buckets.len() - cut_off_buckets_front - cut_off_buckets_back;
|
||||
let mut buckets = Vec::with_capacity(self.buckets.len());
|
||||
|
||||
agg_with_accessor
|
||||
.bucket_count
|
||||
.add_count(estimate_num_buckets as u32);
|
||||
agg_with_accessor.bucket_count.validate_bucket_count()?;
|
||||
}
|
||||
if self.sub_aggregation_blueprint.is_some() {
|
||||
for (bucket_pos, bucket) in self.buckets.into_iter() {
|
||||
let bucket_res = bucket.into_intermediate_bucket_entry(
|
||||
self.sub_aggregations.get(&bucket_pos).unwrap().clone(),
|
||||
&agg_with_accessor.sub_aggregation,
|
||||
);
|
||||
|
||||
let mut buckets = Vec::with_capacity(
|
||||
self.buckets
|
||||
.iter()
|
||||
.filter(|bucket| bucket.doc_count != 0)
|
||||
.count(),
|
||||
);
|
||||
|
||||
// Below we remove empty buckets for two reasons
|
||||
// 1. To reduce the size of the intermediate result, which may be passed on the wire.
|
||||
// 2. To mimic elasticsearch, there are no empty buckets at the start and end.
|
||||
//
|
||||
// Empty buckets may be added later again in the final result, depending on the request.
|
||||
if let Some(sub_aggregations) = self.sub_aggregations {
|
||||
for bucket_res in self
|
||||
.buckets
|
||||
.into_iter()
|
||||
.zip(sub_aggregations.into_iter())
|
||||
.filter(|(bucket, _sub_aggregation)| bucket.doc_count != 0)
|
||||
.map(|(bucket, sub_aggregation)| {
|
||||
bucket.into_intermediate_bucket_entry(
|
||||
sub_aggregation,
|
||||
&agg_with_accessor.sub_aggregation,
|
||||
)
|
||||
})
|
||||
{
|
||||
buckets.push(bucket_res?);
|
||||
}
|
||||
} else {
|
||||
buckets.extend(
|
||||
self.buckets
|
||||
.into_iter()
|
||||
.filter(|bucket| bucket.doc_count != 0)
|
||||
.map(|bucket| bucket.into()),
|
||||
);
|
||||
buckets.extend(self.buckets.into_values().map(|bucket| bucket.into()));
|
||||
};
|
||||
buckets.sort_unstable_by(|b1, b2| b1.key.partial_cmp(&b2.key).unwrap_or(Ordering::Equal));
|
||||
|
||||
Ok(IntermediateBucketResult::Histogram {
|
||||
buckets,
|
||||
@@ -359,104 +315,70 @@ impl SegmentHistogramCollector {
|
||||
req: &HistogramAggregation,
|
||||
sub_aggregation: &AggregationsWithAccessor,
|
||||
field_type: ColumnType,
|
||||
accessor: &Column<u64>,
|
||||
accessor_idx: usize,
|
||||
) -> crate::Result<Self> {
|
||||
req.validate()?;
|
||||
let min = f64_from_fastfield_u64(accessor.min_value(), &field_type);
|
||||
let max = f64_from_fastfield_u64(accessor.max_value(), &field_type);
|
||||
|
||||
let (min, max) = get_req_min_max(req, Some((min, max)));
|
||||
|
||||
// We compute and generate the buckets range (min, max) based on the request and the min
|
||||
// max in the fast field, but this is likely not ideal when this is a subbucket, where many
|
||||
// unnecessary buckets may be generated.
|
||||
let buckets = generate_buckets(req, min, max);
|
||||
|
||||
let sub_aggregations = if sub_aggregation.is_empty() {
|
||||
let sub_aggregation_blueprint = if sub_aggregation.is_empty() {
|
||||
None
|
||||
} else {
|
||||
let sub_aggregation = build_segment_agg_collector(sub_aggregation, false)?;
|
||||
Some(buckets.iter().map(|_| sub_aggregation.clone()).collect())
|
||||
Some(sub_aggregation)
|
||||
};
|
||||
|
||||
let buckets = buckets
|
||||
.iter()
|
||||
.map(|bucket| SegmentHistogramBucketEntry {
|
||||
key: *bucket,
|
||||
doc_count: 0,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let first_bucket_num =
|
||||
get_bucket_num_f64(min, req.interval, req.offset.unwrap_or(0.0)) as i64;
|
||||
|
||||
let bounds = req.hard_bounds.unwrap_or(HistogramBounds {
|
||||
min: f64::MIN,
|
||||
max: f64::MAX,
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
buckets,
|
||||
buckets: Default::default(),
|
||||
column_type: field_type,
|
||||
interval: req.interval,
|
||||
offset: req.offset.unwrap_or(0.0),
|
||||
first_bucket_num,
|
||||
bounds,
|
||||
sub_aggregations,
|
||||
min_doc_count: req.min_doc_count(),
|
||||
sub_aggregations: Default::default(),
|
||||
sub_aggregation_blueprint,
|
||||
accessor_idx,
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn increment_bucket_if_in_bounds(
|
||||
fn increment_bucket(
|
||||
&mut self,
|
||||
val: f64,
|
||||
bounds: &HistogramBounds,
|
||||
bucket_pos: usize,
|
||||
bucket_pos: i64,
|
||||
doc: DocId,
|
||||
bucket_with_accessor: &AggregationsWithAccessor,
|
||||
interval: f64,
|
||||
offset: f64,
|
||||
) -> crate::Result<()> {
|
||||
if bounds.contains(val) {
|
||||
debug_assert_eq!(
|
||||
self.buckets[bucket_pos].key,
|
||||
get_bucket_val(val, self.interval, self.offset)
|
||||
);
|
||||
|
||||
self.increment_bucket(bucket_pos, doc, bucket_with_accessor)?;
|
||||
let bucket = self.buckets.entry(bucket_pos).or_insert_with(|| {
|
||||
let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset);
|
||||
SegmentHistogramBucketEntry { key, doc_count: 0 }
|
||||
});
|
||||
bucket.doc_count += 1;
|
||||
if let Some(sub_aggregation_blueprint) = self.sub_aggregation_blueprint.as_mut() {
|
||||
self.sub_aggregations
|
||||
.entry(bucket_pos)
|
||||
.or_insert_with(|| sub_aggregation_blueprint.clone())
|
||||
.collect(doc, bucket_with_accessor)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn increment_bucket(
|
||||
&mut self,
|
||||
bucket_pos: usize,
|
||||
doc: DocId,
|
||||
bucket_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<()> {
|
||||
let bucket = &mut self.buckets[bucket_pos];
|
||||
bucket.doc_count += 1;
|
||||
if let Some(sub_aggregation) = self.sub_aggregations.as_mut() {
|
||||
sub_aggregation[bucket_pos].collect(doc, bucket_with_accessor)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn f64_from_fastfield_u64(&self, val: u64) -> f64 {
|
||||
f64_from_fastfield_u64(val, &self.column_type)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_bucket_num_f64(val: f64, interval: f64, offset: f64) -> f64 {
|
||||
fn get_bucket_pos_f64(val: f64, interval: f64, offset: f64) -> f64 {
|
||||
((val - offset) / interval).floor()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_bucket_val(val: f64, interval: f64, offset: f64) -> f64 {
|
||||
let bucket_pos = get_bucket_num_f64(val, interval, offset);
|
||||
fn get_bucket_key_from_pos(bucket_pos: f64, interval: f64, offset: f64) -> f64 {
|
||||
bucket_pos * interval + offset
|
||||
}
|
||||
|
||||
@@ -470,13 +392,8 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
//
|
||||
// The bounds are the min max from the current buckets, optionally extended by
|
||||
// extended_bounds from the request
|
||||
let min_max = if buckets.is_empty() {
|
||||
None
|
||||
} else {
|
||||
let min = buckets[0].key;
|
||||
let max = buckets[buckets.len() - 1].key;
|
||||
Some((min, max))
|
||||
};
|
||||
let min_max = minmax(buckets.iter().map(|bucket| bucket.key));
|
||||
|
||||
let fill_gaps_buckets = generate_buckets_with_opt_minmax(histogram_req, min_max);
|
||||
|
||||
let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req(sub_aggregation);
|
||||
@@ -562,12 +479,6 @@ fn get_req_min_max(req: &HistogramAggregation, min_max: Option<(f64, f64)>) -> (
|
||||
(min, max)
|
||||
}
|
||||
|
||||
/// Generates buckets with req.interval
|
||||
/// range is computed for provided min_max and request extended_bounds/hard_bounds
|
||||
pub(crate) fn generate_buckets(req: &HistogramAggregation, min: f64, max: f64) -> Vec<f64> {
|
||||
generate_buckets_with_opt_minmax(req, Some((min, max)))
|
||||
}
|
||||
|
||||
/// Generates buckets with req.interval
|
||||
/// Range is computed for provided min_max and request extended_bounds/hard_bounds
|
||||
/// returns empty vec when there is no range to span
|
||||
@@ -578,8 +489,8 @@ pub(crate) fn generate_buckets_with_opt_minmax(
|
||||
let (min, max) = get_req_min_max(req, min_max);
|
||||
|
||||
let offset = req.offset.unwrap_or(0.0);
|
||||
let first_bucket_num = get_bucket_num_f64(min, req.interval, offset) as i64;
|
||||
let last_bucket_num = get_bucket_num_f64(max, req.interval, offset) as i64;
|
||||
let first_bucket_num = get_bucket_pos_f64(min, req.interval, offset) as i64;
|
||||
let last_bucket_num = get_bucket_pos_f64(max, req.interval, offset) as i64;
|
||||
let mut buckets = Vec::with_capacity((first_bucket_num..=last_bucket_num).count());
|
||||
for bucket_pos in first_bucket_num..=last_bucket_num {
|
||||
let bucket_key = bucket_pos as f64 * req.interval + offset;
|
||||
@@ -589,118 +500,6 @@ pub(crate) fn generate_buckets_with_opt_minmax(
|
||||
buckets
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn generate_buckets_test() {
|
||||
let histogram_req = HistogramAggregation {
|
||||
field: "dummy".to_string(),
|
||||
interval: 2.0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
|
||||
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 2.5, 5.5);
|
||||
assert_eq!(buckets, vec![2.0, 4.0]);
|
||||
|
||||
// Single bucket
|
||||
let buckets = generate_buckets(&histogram_req, 0.5, 0.75);
|
||||
assert_eq!(buckets, vec![0.0]);
|
||||
|
||||
// With offset
|
||||
let histogram_req = HistogramAggregation {
|
||||
field: "dummy".to_string(),
|
||||
interval: 2.0,
|
||||
offset: Some(0.5),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
|
||||
assert_eq!(buckets, vec![-1.5, 0.5, 2.5, 4.5, 6.5, 8.5]);
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 2.5, 5.5);
|
||||
assert_eq!(buckets, vec![2.5, 4.5]);
|
||||
|
||||
// Single bucket
|
||||
let buckets = generate_buckets(&histogram_req, 0.5, 0.75);
|
||||
assert_eq!(buckets, vec![0.5]);
|
||||
|
||||
// no bucket
|
||||
let buckets = generate_buckets(&histogram_req, f64::MAX, f64::MIN);
|
||||
assert_eq!(buckets, vec![] as Vec<f64>);
|
||||
|
||||
// With extended_bounds
|
||||
let histogram_req = HistogramAggregation {
|
||||
field: "dummy".to_string(),
|
||||
interval: 2.0,
|
||||
extended_bounds: Some(HistogramBounds {
|
||||
min: 0.0,
|
||||
max: 10.0,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
|
||||
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 2.5, 5.5);
|
||||
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
|
||||
|
||||
// Single bucket, but extended_bounds
|
||||
let buckets = generate_buckets(&histogram_req, 0.5, 0.75);
|
||||
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
|
||||
|
||||
// no bucket, but extended_bounds
|
||||
let buckets = generate_buckets(&histogram_req, f64::MAX, f64::MIN);
|
||||
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
|
||||
|
||||
// With invalid extended_bounds
|
||||
let histogram_req = HistogramAggregation {
|
||||
field: "dummy".to_string(),
|
||||
interval: 2.0,
|
||||
extended_bounds: Some(HistogramBounds { min: 3.0, max: 5.0 }),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
|
||||
assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]);
|
||||
|
||||
// With hard_bounds reducing
|
||||
let histogram_req = HistogramAggregation {
|
||||
field: "dummy".to_string(),
|
||||
interval: 2.0,
|
||||
hard_bounds: Some(HistogramBounds { min: 3.0, max: 5.0 }),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 0.0, 10.0);
|
||||
assert_eq!(buckets, vec![2.0, 4.0]);
|
||||
|
||||
// With hard_bounds, extending has no effect
|
||||
let histogram_req = HistogramAggregation {
|
||||
field: "dummy".to_string(),
|
||||
interval: 2.0,
|
||||
hard_bounds: Some(HistogramBounds {
|
||||
min: 0.0,
|
||||
max: 10.0,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 2.5, 5.5);
|
||||
assert_eq!(buckets, vec![2.0, 4.0]);
|
||||
|
||||
// Blubber
|
||||
let histogram_req = HistogramAggregation {
|
||||
field: "dummy".to_string(),
|
||||
interval: 2.0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let buckets = generate_buckets(&histogram_req, 4.0, 10.0);
|
||||
assert_eq!(buckets, vec![4.0, 6.0, 8.0, 10.0]);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -1521,36 +1320,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn histogram_test_max_buckets_segments() -> crate::Result<()> {
|
||||
let values = vec![0.0, 70000.0];
|
||||
|
||||
let index = get_test_index_from_values(true, &values)?;
|
||||
|
||||
let agg_req: Aggregations = vec![(
|
||||
"my_interval".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
|
||||
field: "score_f64".to_string(),
|
||||
interval: 1.0,
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let res = exec_request(agg_req, &index);
|
||||
|
||||
assert_eq!(
|
||||
res.unwrap_err().to_string(),
|
||||
"An invalid argument was passed: 'Aborting aggregation because too many buckets were \
|
||||
created'"
|
||||
.to_string()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,7 +169,6 @@ pub(crate) fn build_bucket_segment_agg_collector(
|
||||
histogram,
|
||||
&req.sub_aggregation,
|
||||
req.field_type,
|
||||
&req.accessor,
|
||||
accessor_idx,
|
||||
)?,
|
||||
)),
|
||||
|
||||
Reference in New Issue
Block a user