implement SegmentAggregationCollector on bucket aggs (#1878)

This commit is contained in:
PSeitz
2023-02-17 19:53:29 +08:00
committed by GitHub
parent bf1449b22d
commit 74bf60b4f7
9 changed files with 477 additions and 431 deletions

View File

@@ -38,6 +38,7 @@ impl<T: MonotonicallyMappableToU64> Column<T> {
}
impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
#[inline]
pub fn get_cardinality(&self) -> Cardinality {
self.idx.get_cardinality()
}

View File

@@ -34,6 +34,7 @@ impl From<MultiValueIndex> for ColumnIndex {
}
impl ColumnIndex {
#[inline]
pub fn get_cardinality(&self) -> Cardinality {
match self {
ColumnIndex::Full => Cardinality::Full,

View File

@@ -14,9 +14,9 @@ use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
};
use crate::aggregation::segment_agg_result::{
GenericSegmentAggregationResultsCollector, SegmentAggregationCollector,
build_segment_agg_collector, SegmentAggregationCollector,
};
use crate::aggregation::{f64_from_fastfield_u64, format_date};
use crate::aggregation::{f64_from_fastfield_u64, format_date, VecWithNames};
use crate::schema::{Schema, Type};
use crate::{DocId, TantivyError};
@@ -185,7 +185,7 @@ pub(crate) struct SegmentHistogramBucketEntry {
impl SegmentHistogramBucketEntry {
pub(crate) fn into_intermediate_bucket_entry(
self,
sub_aggregation: GenericSegmentAggregationResultsCollector,
sub_aggregation: Box<dyn SegmentAggregationCollector>,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<IntermediateHistogramBucketEntry> {
Ok(IntermediateHistogramBucketEntry {
@@ -203,13 +203,86 @@ impl SegmentHistogramBucketEntry {
pub struct SegmentHistogramCollector {
/// The buckets containing the aggregation data.
buckets: Vec<SegmentHistogramBucketEntry>,
sub_aggregations: Option<Vec<GenericSegmentAggregationResultsCollector>>,
sub_aggregations: Option<Vec<Box<dyn SegmentAggregationCollector>>>,
field_type: Type,
interval: f64,
offset: f64,
min_doc_count: u64,
first_bucket_num: i64,
bounds: HistogramBounds,
accessor_idx: usize,
}
impl SegmentAggregationCollector for SegmentHistogramCollector {
fn into_intermediate_aggregations_result(
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<IntermediateAggregationResults> {
let name = agg_with_accessor.buckets.keys[self.accessor_idx].to_string();
let agg_with_accessor = &agg_with_accessor.buckets.values[self.accessor_idx];
let bucket = self.into_intermediate_bucket_result(agg_with_accessor)?;
let buckets = Some(VecWithNames::from_entries(vec![(name, bucket)]));
Ok(IntermediateAggregationResults {
metrics: None,
buckets,
})
}
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
let bounds = self.bounds;
let interval = self.interval;
let offset = self.offset;
let first_bucket_num = self.first_bucket_num;
let get_bucket_num =
|val| (get_bucket_num_f64(val, interval, offset) as i64 - first_bucket_num) as usize;
for doc in docs {
for val in accessor.values(*doc) {
let val = self.f64_from_fastfield_u64(val);
let bucket_pos = get_bucket_num(val);
self.increment_bucket_if_in_bounds(
val,
&bounds,
bucket_pos,
*doc,
sub_aggregation_accessor,
)?;
}
}
Ok(())
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
if let Some(sub_aggregations) = self.sub_aggregations.as_mut() {
for sub_aggregation in sub_aggregations {
sub_aggregation.flush(sub_aggregation_accessor)?;
}
}
Ok(())
}
}
impl SegmentHistogramCollector {
@@ -285,6 +358,7 @@ impl SegmentHistogramCollector {
sub_aggregation: &AggregationsWithAccessor,
field_type: Type,
accessor: &Column<u64>,
accessor_idx: usize,
) -> crate::Result<Self> {
req.validate()?;
let min = f64_from_fastfield_u64(accessor.min_value(), &field_type);
@@ -300,8 +374,7 @@ impl SegmentHistogramCollector {
let sub_aggregations = if sub_aggregation.is_empty() {
None
} else {
let sub_aggregation =
GenericSegmentAggregationResultsCollector::from_req_and_validate(sub_aggregation)?;
let sub_aggregation = build_segment_agg_collector(sub_aggregation, false)?;
Some(buckets.iter().map(|_| sub_aggregation.clone()).collect())
};
@@ -330,40 +403,10 @@ impl SegmentHistogramCollector {
bounds,
sub_aggregations,
min_doc_count: req.min_doc_count(),
accessor_idx,
})
}
#[inline]
pub(crate) fn collect_block(
&mut self,
docs: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
let bounds = self.bounds;
let interval = self.interval;
let offset = self.offset;
let first_bucket_num = self.first_bucket_num;
let get_bucket_num =
|val| (get_bucket_num_f64(val, interval, offset) as i64 - first_bucket_num) as usize;
let accessor = &bucket_with_accessor.accessor;
for doc in docs {
for val in accessor.values(*doc) {
let val = self.f64_from_fastfield_u64(val);
let bucket_pos = get_bucket_num(val);
self.increment_bucket_if_in_bounds(
val,
&bounds,
bucket_pos,
*doc,
&bucket_with_accessor.sub_aggregation,
)?;
}
}
Ok(())
}
#[inline]
fn increment_bucket_if_in_bounds(
&mut self,
@@ -399,18 +442,6 @@ impl SegmentHistogramCollector {
Ok(())
}
pub(crate) fn flush(
&mut self,
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
if let Some(sub_aggregations) = self.sub_aggregations.as_mut() {
for sub_aggregation in sub_aggregations {
sub_aggregation.flush(&bucket_with_accessor.sub_aggregation)?;
}
}
Ok(())
}
fn f64_from_fastfield_u64(&self, val: u64) -> f64 {
f64_from_fastfield_u64(val, &self.field_type)
}

View File

@@ -5,20 +5,19 @@ use columnar::MonotonicallyMappableToU64;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use crate::aggregation::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor,
};
use crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor;
use crate::aggregation::intermediate_agg_result::{
IntermediateBucketResult, IntermediateRangeBucketEntry, IntermediateRangeBucketResult,
IntermediateAggregationResults, IntermediateBucketResult, IntermediateRangeBucketEntry,
IntermediateRangeBucketResult,
};
use crate::aggregation::segment_agg_result::{
BucketCount, GenericSegmentAggregationResultsCollector, SegmentAggregationCollector,
build_segment_agg_collector, BucketCount, SegmentAggregationCollector,
};
use crate::aggregation::{
f64_from_fastfield_u64, f64_to_fastfield_u64, format_date, Key, SerializedKey,
f64_from_fastfield_u64, f64_to_fastfield_u64, format_date, Key, SerializedKey, VecWithNames,
};
use crate::schema::Type;
use crate::{DocId, TantivyError};
use crate::TantivyError;
/// Provide user-defined buckets to aggregate on.
/// Two special buckets will automatically be created to cover the whole range of values.
@@ -129,13 +128,14 @@ pub struct SegmentRangeCollector {
/// The buckets containing the aggregation data.
buckets: Vec<SegmentRangeAndBucketEntry>,
field_type: Type,
pub(crate) accessor_idx: usize,
}
#[derive(Clone)]
pub(crate) struct SegmentRangeBucketEntry {
pub key: Key,
pub doc_count: u64,
pub sub_aggregation: Option<GenericSegmentAggregationResultsCollector>,
pub sub_aggregation: Option<Box<dyn SegmentAggregationCollector>>,
/// The from range of the bucket. Equals `f64::MIN` when `None`.
pub from: Option<f64>,
/// The to range of the bucket. Equals `f64::MAX` when `None`. Open interval, `to` is not
@@ -174,12 +174,14 @@ impl SegmentRangeBucketEntry {
}
}
impl SegmentRangeCollector {
pub fn into_intermediate_bucket_result(
self,
agg_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<IntermediateBucketResult> {
impl SegmentAggregationCollector for SegmentRangeCollector {
fn into_intermediate_aggregations_result(
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<IntermediateAggregationResults> {
let field_type = self.field_type;
let name = agg_with_accessor.buckets.keys[self.accessor_idx].to_string();
let sub_agg = &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
let buckets: FxHashMap<SerializedKey, IntermediateRangeBucketEntry> = self
.buckets
@@ -189,21 +191,74 @@ impl SegmentRangeCollector {
range_to_string(&range_bucket.range, &field_type)?,
range_bucket
.bucket
.into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation)?,
.into_intermediate_bucket_entry(sub_agg)?,
))
})
.collect::<crate::Result<_>>()?;
Ok(IntermediateBucketResult::Range(
IntermediateRangeBucketResult { buckets },
))
let bucket = IntermediateBucketResult::Range(IntermediateRangeBucketResult { buckets });
let buckets = Some(VecWithNames::from_entries(vec![(name, bucket)]));
Ok(IntermediateAggregationResults {
metrics: None,
buckets,
})
}
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
for doc in docs {
for val in accessor.values(*doc) {
let bucket_pos = self.get_bucket_pos(val);
let bucket = &mut self.buckets[bucket_pos];
bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(*doc, sub_aggregation_accessor)?;
}
}
}
Ok(())
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
for bucket in self.buckets.iter_mut() {
if let Some(sub_agg) = bucket.bucket.sub_aggregation.as_mut() {
sub_agg.flush(sub_aggregation_accessor)?;
}
}
Ok(())
}
}
impl SegmentRangeCollector {
pub(crate) fn from_req_and_validate(
req: &RangeAggregation,
sub_aggregation: &AggregationsWithAccessor,
bucket_count: &BucketCount,
field_type: Type,
accessor_idx: usize,
) -> crate::Result<Self> {
// The range input on the request is f64.
// We need to convert to u64 ranges, because we read the values as u64.
@@ -229,11 +284,7 @@ impl SegmentRangeCollector {
let sub_aggregation = if sub_aggregation.is_empty() {
None
} else {
Some(
GenericSegmentAggregationResultsCollector::from_req_and_validate(
sub_aggregation,
)?,
)
Some(build_segment_agg_collector(sub_aggregation, false)?)
};
Ok(SegmentRangeAndBucketEntry {
@@ -255,32 +306,10 @@ impl SegmentRangeCollector {
Ok(SegmentRangeCollector {
buckets,
field_type,
accessor_idx,
})
}
#[inline]
pub(crate) fn collect_block(
&mut self,
docs: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
let accessor = &bucket_with_accessor.accessor;
for doc in docs {
for val in accessor.values(*doc) {
let bucket_pos = self.get_bucket_pos(val);
let bucket = &mut self.buckets[bucket_pos];
bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(*doc, &bucket_with_accessor.sub_aggregation)?;
}
}
}
Ok(())
}
#[inline]
fn get_bucket_pos(&self, val: u64) -> usize {
let pos = self
@@ -290,18 +319,6 @@ impl SegmentRangeCollector {
debug_assert!(self.buckets[pos].range.contains(&val));
pos
}
pub(crate) fn flush(
&mut self,
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
for bucket in &mut self.buckets {
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.flush(&bucket_with_accessor.sub_aggregation)?;
}
}
Ok(())
}
}
/// Converts the user provided f64 range value to fast field value space.
@@ -419,8 +436,9 @@ mod tests {
use super::*;
use crate::aggregation::agg_req::{
Aggregation, Aggregations, BucketAggregation, BucketAggregationType,
Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation,
};
use crate::aggregation::metric::AverageAggregation;
use crate::aggregation::tests::{
exec_request, exec_request_with_query, get_test_index_2_segments,
get_test_index_with_num_docs,
@@ -441,6 +459,7 @@ mod tests {
&Default::default(),
&Default::default(),
field_type,
0,
)
.expect("unexpected error")
}
@@ -477,6 +496,47 @@ mod tests {
Ok(())
}
#[test]
fn range_fraction_test_with_sub_agg() -> crate::Result<()> {
let index = get_test_index_with_num_docs(false, 100)?;
let sub_agg_req: Aggregations = vec![(
"score_f64".to_string(),
Aggregation::Metric(MetricAggregation::Average(
AverageAggregation::from_field_name("score_f64".to_string()),
)),
)]
.into_iter()
.collect();
let agg_req: Aggregations = vec![(
"range".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Range(RangeAggregation {
field: "fraction_f64".to_string(),
ranges: vec![(0f64..0.1f64).into(), (0.1f64..0.2f64).into()],
..Default::default()
}),
sub_aggregation: sub_agg_req,
}),
)]
.into_iter()
.collect();
let res = exec_request_with_query(agg_req, &index, None)?;
assert_eq!(res["range"]["buckets"][0]["key"], "*-0");
assert_eq!(res["range"]["buckets"][0]["doc_count"], 0);
assert_eq!(res["range"]["buckets"][1]["key"], "0-0.1");
assert_eq!(res["range"]["buckets"][1]["doc_count"], 10);
assert_eq!(res["range"]["buckets"][2]["key"], "0.1-0.2");
assert_eq!(res["range"]["buckets"][2]["doc_count"], 10);
assert_eq!(res["range"]["buckets"][3]["key"], "0.2-*");
assert_eq!(res["range"]["buckets"][3]["doc_count"], 80);
Ok(())
}
#[test]
fn range_keyed_buckets_test() -> crate::Result<()> {
let index = get_test_index_with_num_docs(false, 100)?;

View File

@@ -1,5 +1,6 @@
use std::fmt::Debug;
use columnar::Cardinality;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
@@ -8,13 +9,15 @@ use crate::aggregation::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
IntermediateAggregationResults, IntermediateBucketResult, IntermediateTermBucketEntry,
IntermediateTermBucketResult,
};
use crate::aggregation::segment_agg_result::{
build_segment_agg_collector, SegmentAggregationCollector,
};
use crate::aggregation::VecWithNames;
use crate::error::DataCorruption;
use crate::{DocId, TantivyError};
use crate::TantivyError;
/// Creates a bucket for every unique term and counts the number of occurences.
/// Note that doc_count in the response buckets equals term count here.
@@ -259,6 +262,7 @@ pub struct SegmentTermCollector {
term_buckets: TermBuckets,
req: TermsAggregationInternal,
blueprint: Option<Box<dyn SegmentAggregationCollector>>,
accessor_idx: usize,
}
pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
@@ -266,10 +270,85 @@ pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
(agg_name, agg_property)
}
impl SegmentAggregationCollector for SegmentTermCollector {
fn into_intermediate_aggregations_result(
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<IntermediateAggregationResults> {
let name = agg_with_accessor.buckets.keys[self.accessor_idx].to_string();
let agg_with_accessor = &agg_with_accessor.buckets.values[self.accessor_idx];
let bucket = self.into_intermediate_bucket_result(agg_with_accessor)?;
let buckets = Some(VecWithNames::from_entries(vec![(name, bucket)]));
Ok(IntermediateAggregationResults {
metrics: None,
buckets,
})
}
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
if accessor.get_cardinality() == Cardinality::Full {
for doc in docs {
let term_id = accessor.values.get_val(*doc);
let entry = self
.term_buckets
.entries
.entry(term_id as u32)
.or_insert_with(|| TermBucketEntry::from_blueprint(&self.blueprint));
entry.doc_count += 1;
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.collect(*doc, sub_aggregation_accessor)?;
}
}
} else {
for doc in docs {
for term_id in accessor.values(*doc) {
let entry = self
.term_buckets
.entries
.entry(term_id as u32)
.or_insert_with(|| TermBucketEntry::from_blueprint(&self.blueprint));
entry.doc_count += 1;
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.collect(*doc, sub_aggregation_accessor)?;
}
}
}
}
Ok(())
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
self.term_buckets.force_flush(sub_aggregation_accessor)?;
Ok(())
}
}
impl SegmentTermCollector {
pub(crate) fn from_req_and_validate(
req: &TermsAggregation,
sub_aggregations: &AggregationsWithAccessor,
accessor_idx: usize,
) -> crate::Result<Self> {
let term_buckets = TermBuckets::default();
@@ -299,6 +378,7 @@ impl SegmentTermCollector {
req: TermsAggregationInternal::from_req(req),
term_buckets,
blueprint,
accessor_idx,
})
}
@@ -387,40 +467,6 @@ impl SegmentTermCollector {
},
))
}
#[inline]
pub(crate) fn collect_block(
&mut self,
docs: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
let accessor = &bucket_with_accessor.accessor;
for doc in docs {
for term_id in accessor.values(*doc) {
let entry = self
.term_buckets
.entries
.entry(term_id as u32)
.or_insert_with(|| TermBucketEntry::from_blueprint(&self.blueprint));
entry.doc_count += 1;
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.collect(*doc, &bucket_with_accessor.sub_aggregation)?;
}
}
}
Ok(())
}
pub(crate) fn flush(
&mut self,
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
self.term_buckets
.force_flush(&bucket_with_accessor.sub_aggregation)?;
Ok(())
}
}
pub(crate) trait GetDocCount {
@@ -631,12 +677,15 @@ mod tests {
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "termb");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 2);
assert_eq!(res["my_texts"]["buckets"][0]["avg_score"]["value"], 6.0);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "termc");
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 3);
assert_eq!(res["my_texts"]["buckets"][1]["avg_score"]["value"], 1.0);
assert_eq!(res["my_texts"]["buckets"][2]["key"], "terma");
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5);
assert_eq!(res["my_texts"]["buckets"][2]["avg_score"]["value"], 5.0);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);

View File

@@ -21,7 +21,6 @@ use super::metric::{
IntermediateAverage, IntermediateCount, IntermediateMax, IntermediateMin, IntermediateStats,
IntermediateSum,
};
use super::segment_agg_result::SegmentMetricResultCollector;
use super::{format_date, Key, SerializedKey, VecWithNames};
use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry};
use crate::aggregation::bucket::TermsAggregationInternal;
@@ -220,32 +219,6 @@ pub enum IntermediateMetricResult {
Sum(IntermediateSum),
}
impl From<SegmentMetricResultCollector> for IntermediateMetricResult {
fn from(tree: SegmentMetricResultCollector) -> Self {
use super::metric::SegmentStatsType;
match tree {
SegmentMetricResultCollector::Stats(collector) => match collector.collecting_for {
SegmentStatsType::Average => IntermediateMetricResult::Average(
IntermediateAverage::from_collector(collector),
),
SegmentStatsType::Count => {
IntermediateMetricResult::Count(IntermediateCount::from_collector(collector))
}
SegmentStatsType::Max => {
IntermediateMetricResult::Max(IntermediateMax::from_collector(collector))
}
SegmentStatsType::Min => {
IntermediateMetricResult::Min(IntermediateMin::from_collector(collector))
}
SegmentStatsType::Stats => IntermediateMetricResult::Stats(collector.stats),
SegmentStatsType::Sum => {
IntermediateMetricResult::Sum(IntermediateSum::from_collector(collector))
}
},
}
}
}
impl IntermediateMetricResult {
pub(crate) fn empty_from_req(req: &MetricAggregation) -> Self {
match req {

View File

@@ -172,6 +172,7 @@ impl SegmentStatsCollector {
accessor_idx,
}
}
#[inline]
pub(crate) fn collect_block_with_field(&mut self, docs: &[DocId], field: &Column<u64>) {
if field.get_cardinality() == Cardinality::Full {
for doc in docs {
@@ -195,7 +196,7 @@ impl SegmentAggregationCollector for SegmentStatsCollector {
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<IntermediateAggregationResults> {
let name = agg_with_accessor.metrics.keys[0].to_string();
let name = agg_with_accessor.metrics.keys[self.accessor_idx].to_string();
let intermediate_metric_result = match self.collecting_for {
SegmentStatsType::Average => {
@@ -234,20 +235,15 @@ impl SegmentAggregationCollector for SegmentStatsCollector {
) -> crate::Result<()> {
let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor;
if field.get_cardinality() == Cardinality::Full {
let val = field.values.get_val(doc);
for val in field.values(doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
} else {
for val in field.values(doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
}
Ok(())
}
#[inline]
fn collect_block(
&mut self,
docs: &[crate::DocId],

View File

@@ -209,12 +209,9 @@ impl<T: Clone> From<HashMap<String, T>> for VecWithNames<T> {
}
impl<T: Clone> VecWithNames<T> {
fn from_other<K: Clone + Into<T>>(entries: VecWithNames<K>) -> Self {
let values = entries.values.into_iter().map(Into::into).collect();
Self {
keys: entries.keys,
values,
}
fn extend(&mut self, entries: VecWithNames<T>) {
self.keys.extend(entries.keys);
self.values.extend(entries.values);
}
fn from_entries(mut entries: Vec<(String, T)>) -> Self {
@@ -1495,6 +1492,49 @@ mod tests {
});
}
#[bench]
fn bench_aggregation_range_with_avg(b: &mut Bencher) {
let index = get_test_index_bench(false).unwrap();
let reader = index.reader().unwrap();
b.iter(|| {
let sub_agg_req: Aggregations = vec![(
"average_f64".to_string(),
Aggregation::Metric(MetricAggregation::Average(
AverageAggregation::from_field_name("score_f64".to_string()),
)),
)]
.into_iter()
.collect();
let agg_req_1: Aggregations = vec![(
"rangef64".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Range(RangeAggregation {
field: "score_f64".to_string(),
ranges: vec![
(3f64..7000f64).into(),
(7000f64..20000f64).into(),
(20000f64..30000f64).into(),
(30000f64..40000f64).into(),
(40000f64..50000f64).into(),
(50000f64..60000f64).into(),
],
..Default::default()
}),
sub_aggregation: sub_agg_req,
}),
)]
.into_iter()
.collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher();
searcher.search(&AllQuery, &collector).unwrap()
});
}
// hard bounds has a different algorithm, because it actually limits collection range
#[bench]
fn bench_aggregation_histogram_only_hard_bounds(b: &mut Bencher) {

View File

@@ -14,14 +14,14 @@ use super::agg_req_with_accessor::{
use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTermCollector};
use super::buf_collector::BufAggregationCollector;
use super::collector::MAX_BUCKET_COUNT;
use super::intermediate_agg_result::{IntermediateAggregationResults, IntermediateBucketResult};
use super::intermediate_agg_result::IntermediateAggregationResults;
use super::metric::{
AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, SegmentStatsCollector,
SegmentStatsType, StatsAggregation, SumAggregation,
};
use super::VecWithNames;
use crate::aggregation::agg_req::BucketAggregationType;
use crate::{DocId, TantivyError};
use crate::TantivyError;
pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
fn into_intermediate_aggregations_result(
@@ -74,41 +74,14 @@ pub(crate) fn build_segment_agg_collector(
if req.buckets.is_empty() && req.metrics.len() == 1 {
let req = &req.metrics.values[0];
let accessor_idx = 0;
let stats_collector = match &req.metric {
MetricAggregation::Average(AverageAggregation { .. }) => {
SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Average,
accessor_idx,
)
}
MetricAggregation::Count(CountAggregation { .. }) => SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Count,
accessor_idx,
),
MetricAggregation::Max(MaxAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx)
}
MetricAggregation::Min(MinAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx)
}
MetricAggregation::Stats(StatsAggregation { .. }) => SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Stats,
accessor_idx,
),
MetricAggregation::Sum(SumAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx)
}
};
return build_metric_segment_agg_collector(req, accessor_idx, add_buffer_layer);
}
if add_buffer_layer {
let stats_collector = BufAggregationCollector::new(stats_collector);
return Ok(Box::new(stats_collector));
} else {
return Ok(Box::new(stats_collector));
}
// Single bucket special case
if req.metrics.is_empty() && req.buckets.len() == 1 {
let req = &req.buckets.values[0];
let accessor_idx = 0;
return build_bucket_segment_agg_collector(req, accessor_idx, add_buffer_layer);
}
let agg = GenericSegmentAggregationResultsCollector::from_req_and_validate(req)?;
@@ -120,14 +93,96 @@ pub(crate) fn build_segment_agg_collector(
}
}
#[derive(Clone)]
pub(crate) fn build_metric_segment_agg_collector(
req: &MetricAggregationWithAccessor,
accessor_idx: usize,
add_buffer_layer: bool,
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
let stats_collector = match &req.metric {
MetricAggregation::Average(AverageAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Average, accessor_idx)
}
MetricAggregation::Count(CountAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count, accessor_idx)
}
MetricAggregation::Max(MaxAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx)
}
MetricAggregation::Min(MinAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx)
}
MetricAggregation::Stats(StatsAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats, accessor_idx)
}
MetricAggregation::Sum(SumAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx)
}
};
if add_buffer_layer {
let stats_collector = BufAggregationCollector::new(stats_collector);
Ok(Box::new(stats_collector))
} else {
Ok(Box::new(stats_collector))
}
}
fn box_with_opt_buffer<T: SegmentAggregationCollector + Clone + 'static>(
add_buffer_layer: bool,
collector: T,
) -> Box<dyn SegmentAggregationCollector> {
if add_buffer_layer {
let collector = BufAggregationCollector::new(collector);
Box::new(collector)
} else {
Box::new(collector)
}
}
pub(crate) fn build_bucket_segment_agg_collector(
req: &BucketAggregationWithAccessor,
accessor_idx: usize,
add_buffer_layer: bool,
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
match &req.bucket_agg {
BucketAggregationType::Terms(terms_req) => Ok(box_with_opt_buffer(
add_buffer_layer,
SegmentTermCollector::from_req_and_validate(
terms_req,
&req.sub_aggregation,
accessor_idx,
)?,
)),
BucketAggregationType::Range(range_req) => Ok(box_with_opt_buffer(
add_buffer_layer,
SegmentRangeCollector::from_req_and_validate(
range_req,
&req.sub_aggregation,
&req.bucket_count,
req.field_type,
accessor_idx,
)?,
)),
BucketAggregationType::Histogram(histogram) => Ok(box_with_opt_buffer(
add_buffer_layer,
SegmentHistogramCollector::from_req_and_validate(
histogram,
&req.sub_aggregation,
req.field_type,
&req.accessor,
accessor_idx,
)?,
)),
}
}
#[derive(Clone, Default)]
/// The GenericSegmentAggregationResultsCollector is the generic version of the collector, which
/// can handle arbitrary complexity of sub-aggregations. Ideally we never have to pick this one
/// and can provide specialized versions instead, that remove some of its overhead.
#[derive(Default)]
pub(crate) struct GenericSegmentAggregationResultsCollector {
pub(crate) metrics: Option<VecWithNames<SegmentMetricResultCollector>>,
pub(crate) buckets: Option<VecWithNames<SegmentBucketResultCollector>>,
pub(crate) metrics: Option<Vec<Box<dyn SegmentAggregationCollector>>>,
pub(crate) buckets: Option<Vec<Box<dyn SegmentAggregationCollector>>>,
}
impl Debug for GenericSegmentAggregationResultsCollector {
@@ -145,16 +200,29 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<IntermediateAggregationResults> {
let buckets = if let Some(buckets) = self.buckets {
let entries = buckets
.into_iter()
.zip(agg_with_accessor.buckets.values())
.map(|((key, bucket), acc)| Ok((key, bucket.into_intermediate_bucket_result(acc)?)))
.collect::<crate::Result<Vec<(String, _)>>>()?;
Some(VecWithNames::from_entries(entries))
let mut intermeditate_buckets = VecWithNames::default();
for bucket in buckets {
// TODO too many allocations?
let res = bucket.into_intermediate_aggregations_result(agg_with_accessor)?;
// unwrap is fine since we only have buckets here
intermeditate_buckets.extend(res.buckets.unwrap());
}
Some(intermeditate_buckets)
} else {
None
};
let metrics = if let Some(metrics) = self.metrics {
let mut intermeditate_metrics = VecWithNames::default();
for metric in metrics {
// TODO too many allocations?
let res = metric.into_intermediate_aggregations_result(agg_with_accessor)?;
// unwrap is fine since we only have metrics here
intermeditate_metrics.extend(res.metrics.unwrap());
}
Some(intermeditate_metrics)
} else {
None
};
let metrics = self.metrics.map(VecWithNames::from_other);
Ok(IntermediateAggregationResults { metrics, buckets })
}
@@ -175,17 +243,13 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
if let Some(metrics) = self.metrics.as_mut() {
for (collector, agg_with_accessor) in
metrics.values_mut().zip(agg_with_accessor.metrics.values())
{
collector.collect_block(docs, agg_with_accessor);
for collector in metrics {
collector.collect_block(docs, agg_with_accessor)?;
}
}
if let Some(buckets) = self.buckets.as_mut() {
for (collector, agg_with_accessor) in
buckets.values_mut().zip(agg_with_accessor.buckets.values())
{
for collector in buckets {
collector.collect_block(docs, agg_with_accessor)?;
}
}
@@ -194,10 +258,13 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
if let Some(metrics) = &mut self.metrics {
for collector in metrics {
collector.flush(agg_with_accessor)?;
}
}
if let Some(buckets) = &mut self.buckets {
for (collector, agg_with_accessor) in
buckets.values_mut().zip(agg_with_accessor.buckets.values())
{
for collector in buckets {
collector.flush(agg_with_accessor)?;
}
}
@@ -206,218 +273,46 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
}
impl GenericSegmentAggregationResultsCollector {
pub fn into_intermediate_aggregations_result(
self,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<IntermediateAggregationResults> {
let buckets = if let Some(buckets) = self.buckets {
let entries = buckets
.into_iter()
.zip(agg_with_accessor.buckets.values())
.map(|((key, bucket), acc)| Ok((key, bucket.into_intermediate_bucket_result(acc)?)))
.collect::<crate::Result<Vec<(String, _)>>>()?;
Some(VecWithNames::from_entries(entries))
} else {
None
};
let metrics = self.metrics.map(VecWithNames::from_other);
Ok(IntermediateAggregationResults { metrics, buckets })
}
pub(crate) fn from_req_and_validate(req: &AggregationsWithAccessor) -> crate::Result<Self> {
let buckets = req
.buckets
.iter()
.map(|(key, req)| {
Ok((
key.to_string(),
SegmentBucketResultCollector::from_req_and_validate(req)?,
))
.enumerate()
.map(|(accessor_idx, (_key, req))| {
Ok(build_bucket_segment_agg_collector(
req,
accessor_idx,
false,
)?)
})
.collect::<crate::Result<Vec<(String, _)>>>()?;
.collect::<crate::Result<Vec<Box<dyn SegmentAggregationCollector>>>>()?;
let metrics = req
.metrics
.iter()
.enumerate()
.map(|(accesor_idx, (key, req))| {
Ok((
key.to_string(),
SegmentMetricResultCollector::from_req_and_validate(req, accesor_idx)?,
))
.map(|(accessor_idx, (_key, req))| {
Ok(build_metric_segment_agg_collector(
req,
accessor_idx,
false,
)?)
})
.collect::<crate::Result<Vec<(String, _)>>>()?;
.collect::<crate::Result<Vec<Box<dyn SegmentAggregationCollector>>>>()?;
let metrics = if metrics.is_empty() {
None
} else {
Some(VecWithNames::from_entries(metrics))
Some(metrics)
};
let buckets = if buckets.is_empty() {
None
} else {
Some(VecWithNames::from_entries(buckets))
Some(buckets)
};
Ok(GenericSegmentAggregationResultsCollector { metrics, buckets })
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum SegmentMetricResultCollector {
Stats(SegmentStatsCollector),
}
impl SegmentMetricResultCollector {
pub fn from_req_and_validate(
req: &MetricAggregationWithAccessor,
accessor_idx: usize,
) -> crate::Result<Self> {
match &req.metric {
MetricAggregation::Average(AverageAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Average,
accessor_idx,
)),
),
MetricAggregation::Count(CountAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Count,
accessor_idx,
)),
),
MetricAggregation::Max(MaxAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Max,
accessor_idx,
)),
),
MetricAggregation::Min(MinAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Min,
accessor_idx,
)),
),
MetricAggregation::Stats(StatsAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Stats,
accessor_idx,
)),
),
MetricAggregation::Sum(SumAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Sum,
accessor_idx,
)),
),
}
}
pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) {
match self {
SegmentMetricResultCollector::Stats(stats_collector) => {
stats_collector.collect_block_with_field(doc, &metric.accessor);
}
}
}
}
/// SegmentBucketAggregationResultCollectors will have specialized buckets for collection inside
/// segments.
/// The typical structure of Map<Key, Bucket> is not suitable during collection for performance
/// reasons.
#[derive(Clone, Debug)]
pub(crate) enum SegmentBucketResultCollector {
Range(SegmentRangeCollector),
Histogram(Box<SegmentHistogramCollector>),
Terms(Box<SegmentTermCollector>),
}
impl SegmentBucketResultCollector {
pub fn into_intermediate_bucket_result(
self,
agg_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<IntermediateBucketResult> {
match self {
SegmentBucketResultCollector::Terms(terms) => {
terms.into_intermediate_bucket_result(agg_with_accessor)
}
SegmentBucketResultCollector::Range(range) => {
range.into_intermediate_bucket_result(agg_with_accessor)
}
SegmentBucketResultCollector::Histogram(histogram) => {
histogram.into_intermediate_bucket_result(agg_with_accessor)
}
}
}
pub fn from_req_and_validate(req: &BucketAggregationWithAccessor) -> crate::Result<Self> {
match &req.bucket_agg {
BucketAggregationType::Terms(terms_req) => Ok(Self::Terms(Box::new(
SegmentTermCollector::from_req_and_validate(terms_req, &req.sub_aggregation)?,
))),
BucketAggregationType::Range(range_req) => {
Ok(Self::Range(SegmentRangeCollector::from_req_and_validate(
range_req,
&req.sub_aggregation,
&req.bucket_count,
req.field_type,
)?))
}
BucketAggregationType::Histogram(histogram) => Ok(Self::Histogram(Box::new(
SegmentHistogramCollector::from_req_and_validate(
histogram,
&req.sub_aggregation,
req.field_type,
&req.accessor,
)?,
))),
}
}
#[inline]
pub(crate) fn collect_block(
&mut self,
docs: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
match self {
SegmentBucketResultCollector::Range(range) => {
range.collect_block(docs, bucket_with_accessor)?;
}
SegmentBucketResultCollector::Histogram(histogram) => {
histogram.collect_block(docs, bucket_with_accessor)?;
}
SegmentBucketResultCollector::Terms(terms) => {
terms.collect_block(docs, bucket_with_accessor)?;
}
}
Ok(())
}
#[inline]
pub(crate) fn flush(
&mut self,
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
match self {
SegmentBucketResultCollector::Range(range) => {
range.flush(bucket_with_accessor)?;
}
SegmentBucketResultCollector::Histogram(histogram) => {
histogram.flush(bucket_with_accessor)?;
}
SegmentBucketResultCollector::Terms(terms) => {
terms.flush(bucket_with_accessor)?;
}
}
Ok(())
}
}
#[derive(Clone)]
pub(crate) struct BucketCount {
/// The counter which is shared between the aggregations for one request.