mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-05 01:50:42 +00:00
alternative mixed field aggregation collection (#2135)
* alternative mixed field aggregation collection instead of having multiple accessor in one AggregationWithAccessor split it into multiple independent AggregationWithAccessor * Update src/aggregation/agg_req_with_accessor.rs Co-authored-by: Paul Masurel <paul@quickwit.io> --------- Co-authored-by: Paul Masurel <paul@quickwit.io>
This commit is contained in:
@@ -37,9 +37,6 @@ pub struct AggregationWithAccessor {
|
||||
pub(crate) accessor: Column<u64>,
|
||||
pub(crate) str_dict_column: Option<StrColumn>,
|
||||
pub(crate) field_type: ColumnType,
|
||||
/// In case there are multiple types of fast fields, e.g. string and numeric.
|
||||
/// Only used for term aggregations currently.
|
||||
pub(crate) accessor2: Option<(Column<u64>, ColumnType)>,
|
||||
pub(crate) sub_aggregation: AggregationsWithAccessor,
|
||||
pub(crate) limits: ResourceLimitGuard,
|
||||
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
|
||||
@@ -52,20 +49,31 @@ impl AggregationWithAccessor {
|
||||
sub_aggregation: &Aggregations,
|
||||
reader: &SegmentReader,
|
||||
limits: AggregationLimits,
|
||||
) -> crate::Result<AggregationWithAccessor> {
|
||||
) -> crate::Result<Vec<AggregationWithAccessor>> {
|
||||
let mut str_dict_column = None;
|
||||
let mut accessor2 = None;
|
||||
use AggregationVariants::*;
|
||||
let (accessor, field_type) = match &agg.agg {
|
||||
let acc_field_types: Vec<(Column, ColumnType)> = match &agg.agg {
|
||||
Range(RangeAggregation {
|
||||
field: field_name, ..
|
||||
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
|
||||
}) => vec![get_ff_reader(
|
||||
reader,
|
||||
field_name,
|
||||
Some(get_numeric_or_date_column_types()),
|
||||
)?],
|
||||
Histogram(HistogramAggregation {
|
||||
field: field_name, ..
|
||||
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
|
||||
}) => vec![get_ff_reader(
|
||||
reader,
|
||||
field_name,
|
||||
Some(get_numeric_or_date_column_types()),
|
||||
)?],
|
||||
DateHistogram(DateHistogramAggregationReq {
|
||||
field: field_name, ..
|
||||
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
|
||||
}) => vec![get_ff_reader(
|
||||
reader,
|
||||
field_name,
|
||||
Some(get_numeric_or_date_column_types()),
|
||||
)?],
|
||||
Terms(TermsAggregation {
|
||||
field: field_name, ..
|
||||
}) => {
|
||||
@@ -80,11 +88,7 @@ impl AggregationWithAccessor {
|
||||
// ColumnType::IpAddr Unsupported
|
||||
// ColumnType::DateTime Unsupported
|
||||
];
|
||||
let mut columns =
|
||||
get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?;
|
||||
let first = columns.pop().unwrap();
|
||||
accessor2 = columns.pop();
|
||||
first
|
||||
get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?
|
||||
}
|
||||
Average(AverageAggregation { field: field_name })
|
||||
| Count(CountAggregation { field: field_name })
|
||||
@@ -95,7 +99,7 @@ impl AggregationWithAccessor {
|
||||
let (accessor, field_type) =
|
||||
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
|
||||
|
||||
(accessor, field_type)
|
||||
vec![(accessor, field_type)]
|
||||
}
|
||||
Percentiles(percentiles) => {
|
||||
let (accessor, field_type) = get_ff_reader(
|
||||
@@ -103,25 +107,29 @@ impl AggregationWithAccessor {
|
||||
percentiles.field_name(),
|
||||
Some(get_numeric_or_date_column_types()),
|
||||
)?;
|
||||
(accessor, field_type)
|
||||
vec![(accessor, field_type)]
|
||||
}
|
||||
};
|
||||
|
||||
let sub_aggregation = sub_aggregation.clone();
|
||||
Ok(AggregationWithAccessor {
|
||||
accessor,
|
||||
accessor2,
|
||||
field_type,
|
||||
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
|
||||
&sub_aggregation,
|
||||
reader,
|
||||
&limits,
|
||||
)?,
|
||||
agg: agg.clone(),
|
||||
str_dict_column,
|
||||
limits: limits.new_guard(),
|
||||
column_block_accessor: Default::default(),
|
||||
})
|
||||
let aggs: Vec<AggregationWithAccessor> = acc_field_types
|
||||
.into_iter()
|
||||
.map(|(accessor, field_type)| {
|
||||
Ok(AggregationWithAccessor {
|
||||
accessor,
|
||||
field_type,
|
||||
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
|
||||
sub_aggregation,
|
||||
reader,
|
||||
&limits,
|
||||
)?,
|
||||
agg: agg.clone(),
|
||||
str_dict_column: str_dict_column.clone(),
|
||||
limits: limits.new_guard(),
|
||||
column_block_accessor: Default::default(),
|
||||
})
|
||||
})
|
||||
.collect::<crate::Result<_>>()?;
|
||||
Ok(aggs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,15 +149,15 @@ pub(crate) fn get_aggs_with_segment_accessor_and_validate(
|
||||
) -> crate::Result<AggregationsWithAccessor> {
|
||||
let mut aggss = Vec::new();
|
||||
for (key, agg) in aggs.iter() {
|
||||
aggss.push((
|
||||
key.to_string(),
|
||||
AggregationWithAccessor::try_from_agg(
|
||||
agg,
|
||||
agg.sub_aggregation(),
|
||||
reader,
|
||||
limits.clone(),
|
||||
)?,
|
||||
));
|
||||
let aggs = AggregationWithAccessor::try_from_agg(
|
||||
agg,
|
||||
agg.sub_aggregation(),
|
||||
reader,
|
||||
limits.clone(),
|
||||
)?;
|
||||
for agg in aggs {
|
||||
aggss.push((key.to_string(), agg));
|
||||
}
|
||||
}
|
||||
Ok(AggregationsWithAccessor::from_data(
|
||||
VecWithNames::from_entries(aggss),
|
||||
|
||||
@@ -465,7 +465,7 @@ mod tests {
|
||||
SegmentRangeCollector::from_req_and_validate(
|
||||
&req,
|
||||
&mut Default::default(),
|
||||
&mut AggregationLimits::default().new_guard(),
|
||||
&AggregationLimits::default().new_guard(),
|
||||
field_type,
|
||||
0,
|
||||
)
|
||||
|
||||
@@ -224,110 +224,6 @@ impl TermBuckets {
|
||||
}
|
||||
}
|
||||
|
||||
/// The composite collector is used, when we have different types under one field, to support a term
|
||||
/// aggregation on both.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SegmentTermCollectorComposite {
|
||||
term_agg1: SegmentTermCollector, // field type 1, e.g. strings
|
||||
term_agg2: SegmentTermCollector, // field type 2, e.g. u64
|
||||
accessor_idx: usize,
|
||||
}
|
||||
impl SegmentAggregationCollector for SegmentTermCollectorComposite {
|
||||
fn add_intermediate_aggregation_result(
|
||||
self: Box<Self>,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
results: &mut IntermediateAggregationResults,
|
||||
) -> crate::Result<()> {
|
||||
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();
|
||||
let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx];
|
||||
|
||||
let bucket = self
|
||||
.term_agg1
|
||||
.into_intermediate_bucket_result(agg_with_accessor)?;
|
||||
results.push(
|
||||
name.to_string(),
|
||||
IntermediateAggregationResult::Bucket(bucket),
|
||||
)?;
|
||||
let bucket = self
|
||||
.term_agg2
|
||||
.into_intermediate_bucket_result(agg_with_accessor)?;
|
||||
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn collect(
|
||||
&mut self,
|
||||
doc: crate::DocId,
|
||||
agg_with_accessor: &mut AggregationsWithAccessor,
|
||||
) -> crate::Result<()> {
|
||||
self.term_agg1.collect_block(&[doc], agg_with_accessor)?;
|
||||
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
|
||||
self.term_agg2.collect_block(&[doc], agg_with_accessor)?;
|
||||
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn collect_block(
|
||||
&mut self,
|
||||
docs: &[crate::DocId],
|
||||
agg_with_accessor: &mut AggregationsWithAccessor,
|
||||
) -> crate::Result<()> {
|
||||
self.term_agg1.collect_block(docs, agg_with_accessor)?;
|
||||
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
|
||||
self.term_agg2.collect_block(docs, agg_with_accessor)?;
|
||||
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
|
||||
self.term_agg1.flush(agg_with_accessor)?;
|
||||
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
|
||||
self.term_agg2.flush(agg_with_accessor)?;
|
||||
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentTermCollectorComposite {
|
||||
/// Swaps the accessor and field type with the second accessor and field type.
|
||||
/// This way we can use the same code for both aggregations.
|
||||
fn swap_accessor(&self, aggregations: &mut AggregationWithAccessor) {
|
||||
if let Some(accessor) = aggregations.accessor2.as_mut() {
|
||||
std::mem::swap(&mut accessor.0, &mut aggregations.accessor);
|
||||
std::mem::swap(&mut accessor.1, &mut aggregations.field_type);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn from_req_and_validate(
|
||||
req: &TermsAggregation,
|
||||
sub_aggregations: &mut AggregationsWithAccessor,
|
||||
field_type: ColumnType,
|
||||
field_type2: ColumnType,
|
||||
accessor_idx: usize,
|
||||
) -> crate::Result<Self> {
|
||||
Ok(Self {
|
||||
term_agg1: SegmentTermCollector::from_req_and_validate(
|
||||
req,
|
||||
sub_aggregations,
|
||||
field_type,
|
||||
accessor_idx,
|
||||
)?,
|
||||
term_agg2: SegmentTermCollector::from_req_and_validate(
|
||||
req,
|
||||
sub_aggregations,
|
||||
field_type2,
|
||||
accessor_idx,
|
||||
)?,
|
||||
accessor_idx,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
#[derive(Clone, Debug)]
|
||||
|
||||
@@ -15,7 +15,6 @@ use super::metric::{
|
||||
SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation,
|
||||
SumAggregation,
|
||||
};
|
||||
use crate::aggregation::bucket::SegmentTermCollectorComposite;
|
||||
|
||||
pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
|
||||
fn add_intermediate_aggregation_result(
|
||||
@@ -81,26 +80,12 @@ pub(crate) fn build_single_agg_segment_collector(
|
||||
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
|
||||
use AggregationVariants::*;
|
||||
match &req.agg.agg {
|
||||
Terms(terms_req) => {
|
||||
if let Some(acc2) = req.accessor2.as_ref() {
|
||||
Ok(Box::new(
|
||||
SegmentTermCollectorComposite::from_req_and_validate(
|
||||
terms_req,
|
||||
&mut req.sub_aggregation,
|
||||
req.field_type,
|
||||
acc2.1,
|
||||
accessor_idx,
|
||||
)?,
|
||||
))
|
||||
} else {
|
||||
Ok(Box::new(SegmentTermCollector::from_req_and_validate(
|
||||
terms_req,
|
||||
&mut req.sub_aggregation,
|
||||
req.field_type,
|
||||
accessor_idx,
|
||||
)?))
|
||||
}
|
||||
}
|
||||
Terms(terms_req) => Ok(Box::new(SegmentTermCollector::from_req_and_validate(
|
||||
terms_req,
|
||||
&mut req.sub_aggregation,
|
||||
req.field_type,
|
||||
accessor_idx,
|
||||
)?)),
|
||||
Range(range_req) => Ok(Box::new(SegmentRangeCollector::from_req_and_validate(
|
||||
range_req,
|
||||
&mut req.sub_aggregation,
|
||||
|
||||
Reference in New Issue
Block a user