refactor aggregations (#1875)

* add specialized version for full cardinality

Pre Columnar
test aggregation::tests::bench::bench_aggregation_average_u64                                                            ... bench:   6,681,850 ns/iter (+/- 1,217,385)
test aggregation::tests::bench::bench_aggregation_average_u64_and_f64                                                    ... bench:  10,576,327 ns/iter (+/- 494,380)

Current
test aggregation::tests::bench::bench_aggregation_average_u64                                                            ... bench:  11,562,084 ns/iter (+/- 3,678,682)
test aggregation::tests::bench::bench_aggregation_average_u64_and_f64                                                    ... bench:  18,925,790 ns/iter (+/- 17,616,771)

Post Change
test aggregation::tests::bench::bench_aggregation_average_u64                                                            ... bench:   9,123,811 ns/iter (+/- 399,720)
test aggregation::tests::bench::bench_aggregation_average_u64_and_f64                                                    ... bench:  13,111,825 ns/iter (+/- 273,547)

* refactor aggregation collection

* add buffering collector
This commit is contained in:
PSeitz
2023-02-16 20:15:16 +08:00
committed by GitHub
parent 7423f99719
commit 019db10e8e
8 changed files with 329 additions and 143 deletions

View File

@@ -338,7 +338,6 @@ impl SegmentHistogramCollector {
&mut self,
docs: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
force_flush: bool,
) -> crate::Result<()> {
let bounds = self.bounds;
let interval = self.interval;
@@ -362,14 +361,6 @@ impl SegmentHistogramCollector {
)?;
}
}
if force_flush {
if let Some(sub_aggregations) = self.sub_aggregations.as_mut() {
for sub_aggregation in sub_aggregations {
sub_aggregation
.flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?;
}
}
}
Ok(())
}
@@ -408,6 +399,18 @@ impl SegmentHistogramCollector {
Ok(())
}
pub(crate) fn flush(
&mut self,
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
if let Some(sub_aggregations) = self.sub_aggregations.as_mut() {
for sub_aggregation in sub_aggregations {
sub_aggregation.flush(&bucket_with_accessor.sub_aggregation)?;
}
}
Ok(())
}
fn f64_from_fastfield_u64(&self, val: u64) -> f64 {
f64_from_fastfield_u64(val, &self.field_type)
}

View File

@@ -263,40 +263,21 @@ impl SegmentRangeCollector {
&mut self,
docs: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
force_flush: bool,
) -> crate::Result<()> {
let accessor = &bucket_with_accessor.accessor;
for doc in docs {
for val in accessor.values(*doc) {
let bucket_pos = self.get_bucket_pos(val);
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?;
}
}
if force_flush {
for bucket in &mut self.buckets {
let bucket = &mut self.buckets[bucket_pos];
bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation
.flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?;
sub_aggregation.collect(*doc, &bucket_with_accessor.sub_aggregation)?;
}
}
}
Ok(())
}
#[inline]
fn increment_bucket(
&mut self,
bucket_pos: usize,
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let bucket = &mut self.buckets[bucket_pos];
bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(doc, bucket_with_accessor)?;
}
Ok(())
}
@@ -309,6 +290,18 @@ impl SegmentRangeCollector {
debug_assert!(self.buckets[pos].range.contains(&val));
pos
}
pub(crate) fn flush(
&mut self,
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
for bucket in &mut self.buckets {
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.flush(&bucket_with_accessor.sub_aggregation)?;
}
}
Ok(())
}
}
/// Converts the user provided f64 range value to fast field value space.

View File

@@ -244,7 +244,7 @@ impl TermBuckets {
fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
for entry in &mut self.entries.values_mut() {
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.flush_staged_docs(agg_with_accessor, false)?;
sub_aggregations.flush(agg_with_accessor)?;
}
}
Ok(())
@@ -289,7 +289,7 @@ impl SegmentTermCollector {
let has_sub_aggregations = !sub_aggregations.is_empty();
let blueprint = if has_sub_aggregations {
let sub_aggregation = build_segment_agg_collector(sub_aggregations)?;
let sub_aggregation = build_segment_agg_collector(sub_aggregations, false)?;
Some(sub_aggregation)
} else {
None
@@ -393,7 +393,6 @@ impl SegmentTermCollector {
&mut self,
docs: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
force_flush: bool,
) -> crate::Result<()> {
let accessor = &bucket_with_accessor.accessor;
@@ -411,10 +410,15 @@ impl SegmentTermCollector {
}
}
if force_flush {
self.term_buckets
.force_flush(&bucket_with_accessor.sub_aggregation)?;
}
Ok(())
}
pub(crate) fn flush(
&mut self,
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
self.term_buckets
.force_flush(&bucket_with_accessor.sub_aggregation)?;
Ok(())
}
}

View File

@@ -0,0 +1,81 @@
use super::agg_req_with_accessor::AggregationsWithAccessor;
use super::intermediate_agg_result::IntermediateAggregationResults;
use super::segment_agg_result::SegmentAggregationCollector;
use crate::DocId;
pub(crate) const DOC_BLOCK_SIZE: usize = 64;
pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE];
/// BufAggregationCollector buffers documents before calling collect_block().
#[derive(Clone)]
pub(crate) struct BufAggregationCollector<T> {
pub(crate) collector: T,
staged_docs: DocBlock,
num_staged_docs: usize,
}
impl<T> std::fmt::Debug for BufAggregationCollector<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SegmentAggregationResultsCollector")
.field("staged_docs", &&self.staged_docs[..self.num_staged_docs])
.field("num_staged_docs", &self.num_staged_docs)
.finish()
}
}
impl<T: SegmentAggregationCollector> BufAggregationCollector<T> {
pub fn new(collector: T) -> Self {
Self {
collector,
num_staged_docs: 0,
staged_docs: [0; DOC_BLOCK_SIZE],
}
}
}
impl<T: SegmentAggregationCollector + Clone + 'static> SegmentAggregationCollector
for BufAggregationCollector<T>
{
fn into_intermediate_aggregations_result(
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<IntermediateAggregationResults> {
Box::new(self.collector).into_intermediate_aggregations_result(agg_with_accessor)
}
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.staged_docs[self.num_staged_docs] = doc;
self.num_staged_docs += 1;
if self.num_staged_docs == self.staged_docs.len() {
self.collector
.collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor)?;
self.num_staged_docs = 0;
}
Ok(())
}
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
for doc in docs {
self.collect(*doc, agg_with_accessor)?;
}
Ok(())
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
self.collector
.collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor)?;
self.num_staged_docs = 0;
self.collector.flush(agg_with_accessor)?;
Ok(())
}
}

View File

@@ -151,7 +151,7 @@ impl AggregationSegmentCollector {
) -> crate::Result<Self> {
let aggs_with_accessor =
get_aggs_with_accessor_and_validate(agg, reader, Rc::default(), max_bucket_count)?;
let result = build_segment_agg_collector(&aggs_with_accessor)?;
let result = build_segment_agg_collector(&aggs_with_accessor, true)?;
Ok(AggregationSegmentCollector {
aggs_with_accessor,
result,
@@ -177,8 +177,7 @@ impl SegmentCollector for AggregationSegmentCollector {
if let Some(err) = self.error {
return Err(err);
}
self.result
.flush_staged_docs(&self.aggs_with_accessor, true)?;
self.result.flush(&self.aggs_with_accessor)?;
self.result
.into_intermediate_aggregations_result(&self.aggs_with_accessor)
}

View File

@@ -1,4 +1,4 @@
use columnar::Column;
use columnar::{Cardinality, Column};
use serde::{Deserialize, Serialize};
use super::*;
@@ -156,23 +156,36 @@ pub(crate) struct SegmentStatsCollector {
field_type: Type,
pub(crate) collecting_for: SegmentStatsType,
pub(crate) stats: IntermediateStats,
pub(crate) accessor_idx: usize,
}
impl SegmentStatsCollector {
pub fn from_req(field_type: Type, collecting_for: SegmentStatsType) -> Self {
pub fn from_req(
field_type: Type,
collecting_for: SegmentStatsType,
accessor_idx: usize,
) -> Self {
Self {
field_type,
collecting_for,
stats: IntermediateStats::default(),
accessor_idx,
}
}
pub(crate) fn collect_block(&mut self, docs: &[DocId], field: &Column<u64>) {
// TODO special case for Required, Optional column type
for doc in docs {
for val in field.values(*doc) {
pub(crate) fn collect_block_with_field(&mut self, docs: &[DocId], field: &Column<u64>) {
if field.get_cardinality() == Cardinality::Full {
for doc in docs {
let val = field.values.get_val(*doc);
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
} else {
for doc in docs {
for val in field.values(*doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
}
}
}
}
@@ -219,20 +232,29 @@ impl SegmentAggregationCollector for SegmentStatsCollector {
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let accessor = &agg_with_accessor.metrics.values[0].accessor;
for val in accessor.values(doc) {
let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor;
if field.get_cardinality() == Cardinality::Full {
let val = field.values.get_val(doc);
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
} else {
for val in field.values(doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
}
Ok(())
}
fn flush_staged_docs(
fn collect_block(
&mut self,
_agg_with_accessor: &AggregationsWithAccessor,
_force_flush: bool,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor;
self.collect_block_with_field(docs, field);
Ok(())
}
}
@@ -293,6 +315,43 @@ mod tests {
Ok(())
}
#[test]
fn test_aggregation_stats_simple() -> crate::Result<()> {
// test index without segments
let values = vec![10.0];
let index = get_test_index_from_values(false, &values)?;
let agg_req_1: Aggregations = vec![(
"stats".to_string(),
Aggregation::Metric(MetricAggregation::Stats(StatsAggregation::from_field_name(
"score".to_string(),
))),
)]
.into_iter()
.collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let reader = index.reader()?;
let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
assert_eq!(
res["stats"],
json!({
"avg": 10.0,
"count": 1,
"max": 10.0,
"min": 10.0,
"sum": 10.0
})
);
Ok(())
}
#[test]
fn test_aggregation_stats() -> crate::Result<()> {
let index = get_test_index_2_segments(false)?;

View File

@@ -160,6 +160,7 @@ pub mod agg_req;
mod agg_req_with_accessor;
pub mod agg_result;
pub mod bucket;
mod buf_collector;
mod collector;
mod date;
pub mod intermediate_agg_result;
@@ -332,8 +333,8 @@ mod tests {
};
use crate::aggregation::agg_result::AggregationResults;
use crate::aggregation::bucket::TermsAggregation;
use crate::aggregation::buf_collector::DOC_BLOCK_SIZE;
use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults;
use crate::aggregation::segment_agg_result::DOC_BLOCK_SIZE;
use crate::aggregation::DistributedAggregationCollector;
use crate::indexer::NoMergePolicy;
use crate::query::{AllQuery, TermQuery};
@@ -1590,7 +1591,7 @@ mod tests {
}
#[bench]
fn bench_aggregation_sub_tree(b: &mut Bencher) {
fn bench_aggregation_avg_and_range_with_avg(b: &mut Bencher) {
let index = get_test_index_bench(false).unwrap();
let reader = index.reader().unwrap();
let text_field = reader.searcher().schema().get_field("text").unwrap();

View File

@@ -12,6 +12,7 @@ use super::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor, MetricAggregationWithAccessor,
};
use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTermCollector};
use super::buf_collector::BufAggregationCollector;
use super::collector::MAX_BUCKET_COUNT;
use super::intermediate_agg_result::{IntermediateAggregationResults, IntermediateBucketResult};
use super::metric::{
@@ -22,9 +23,6 @@ use super::VecWithNames;
use crate::aggregation::agg_req::BucketAggregationType;
use crate::{DocId, TantivyError};
pub(crate) const DOC_BLOCK_SIZE: usize = 64;
pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE];
pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
fn into_intermediate_aggregations_result(
self: Box<Self>,
@@ -37,11 +35,17 @@ pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()>;
fn flush_staged_docs(
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
force_flush: bool,
) -> crate::Result<()>;
/// Finalize method. Some Aggregator collect blocks of docs before calling `collect_block`.
/// This method ensures those staged docs will be collected.
fn flush(&mut self, _agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
Ok(())
}
}
pub(crate) trait CollectorClone {
@@ -64,36 +68,56 @@ impl Clone for Box<dyn SegmentAggregationCollector> {
pub(crate) fn build_segment_agg_collector(
req: &AggregationsWithAccessor,
add_buffer_layer: bool,
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
// Single metric special case
if req.buckets.is_empty() && req.metrics.len() == 1 {
let req = &req.metrics.values[0];
let accessor_idx = 0;
let stats_collector = match &req.metric {
MetricAggregation::Average(AverageAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Average)
}
MetricAggregation::Count(CountAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count)
SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Average,
accessor_idx,
)
}
MetricAggregation::Count(CountAggregation { .. }) => SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Count,
accessor_idx,
),
MetricAggregation::Max(MaxAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max)
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max, accessor_idx)
}
MetricAggregation::Min(MinAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min)
}
MetricAggregation::Stats(StatsAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats)
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min, accessor_idx)
}
MetricAggregation::Stats(StatsAggregation { .. }) => SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Stats,
accessor_idx,
),
MetricAggregation::Sum(SumAggregation { .. }) => {
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum)
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum, accessor_idx)
}
};
return Ok(Box::new(stats_collector));
if add_buffer_layer {
let stats_collector = BufAggregationCollector::new(stats_collector);
return Ok(Box::new(stats_collector));
} else {
return Ok(Box::new(stats_collector));
}
}
let agg = GenericSegmentAggregationResultsCollector::from_req_and_validate(req)?;
Ok(Box::new(agg))
if add_buffer_layer {
let agg = BufAggregationCollector::new(agg);
Ok(Box::new(agg))
} else {
Ok(Box::new(agg))
}
}
#[derive(Clone)]
@@ -103,8 +127,6 @@ pub(crate) fn build_segment_agg_collector(
pub(crate) struct GenericSegmentAggregationResultsCollector {
pub(crate) metrics: Option<VecWithNames<SegmentMetricResultCollector>>,
pub(crate) buckets: Option<VecWithNames<SegmentBucketResultCollector>>,
staged_docs: DocBlock,
num_staged_docs: usize,
}
impl Default for GenericSegmentAggregationResultsCollector {
@@ -112,8 +134,6 @@ impl Default for GenericSegmentAggregationResultsCollector {
Self {
metrics: Default::default(),
buckets: Default::default(),
staged_docs: [0; DOC_BLOCK_SIZE],
num_staged_docs: Default::default(),
}
}
}
@@ -123,8 +143,6 @@ impl Debug for GenericSegmentAggregationResultsCollector {
f.debug_struct("SegmentAggregationResultsCollector")
.field("metrics", &self.metrics)
.field("buckets", &self.buckets)
.field("staged_docs", &&self.staged_docs[..self.num_staged_docs])
.field("num_staged_docs", &self.num_staged_docs)
.finish()
}
}
@@ -154,44 +172,43 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.staged_docs[self.num_staged_docs] = doc;
self.num_staged_docs += 1;
if self.num_staged_docs == self.staged_docs.len() {
self.flush_staged_docs(agg_with_accessor, false)?;
}
self.collect_block(&[doc], agg_with_accessor)?;
Ok(())
}
fn flush_staged_docs(
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
force_flush: bool,
) -> crate::Result<()> {
if self.num_staged_docs == 0 {
return Ok(());
}
if let Some(metrics) = &mut self.metrics {
if let Some(metrics) = self.metrics.as_mut() {
for (collector, agg_with_accessor) in
metrics.values_mut().zip(agg_with_accessor.metrics.values())
{
collector
.collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor);
collector.collect_block(&docs, agg_with_accessor);
}
}
if let Some(buckets) = self.buckets.as_mut() {
for (collector, agg_with_accessor) in
buckets.values_mut().zip(agg_with_accessor.buckets.values())
{
collector.collect_block(&docs, agg_with_accessor)?;
}
}
Ok(())
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
if let Some(buckets) = &mut self.buckets {
for (collector, agg_with_accessor) in
buckets.values_mut().zip(agg_with_accessor.buckets.values())
{
collector.collect_block(
&self.staged_docs[..self.num_staged_docs],
agg_with_accessor,
force_flush,
)?;
collector.flush(agg_with_accessor)?;
}
}
self.num_staged_docs = 0;
Ok(())
}
}
@@ -230,10 +247,11 @@ impl GenericSegmentAggregationResultsCollector {
let metrics = req
.metrics
.iter()
.map(|(key, req)| {
.enumerate()
.map(|(accesor_idx, (key, req))| {
Ok((
key.to_string(),
SegmentMetricResultCollector::from_req_and_validate(req)?,
SegmentMetricResultCollector::from_req_and_validate(req, accesor_idx)?,
))
})
.collect::<crate::Result<Vec<(String, _)>>>()?;
@@ -247,12 +265,7 @@ impl GenericSegmentAggregationResultsCollector {
} else {
Some(VecWithNames::from_entries(buckets))
};
Ok(GenericSegmentAggregationResultsCollector {
metrics,
buckets,
staged_docs: [0; DOC_BLOCK_SIZE],
num_staged_docs: 0,
})
Ok(GenericSegmentAggregationResultsCollector { metrics, buckets })
}
}
@@ -262,44 +275,59 @@ pub(crate) enum SegmentMetricResultCollector {
}
impl SegmentMetricResultCollector {
pub fn from_req_and_validate(req: &MetricAggregationWithAccessor) -> crate::Result<Self> {
pub fn from_req_and_validate(
req: &MetricAggregationWithAccessor,
accessor_idx: usize,
) -> crate::Result<Self> {
match &req.metric {
MetricAggregation::Average(AverageAggregation { .. }) => {
Ok(SegmentMetricResultCollector::Stats(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Average),
))
}
MetricAggregation::Count(CountAggregation { .. }) => {
Ok(SegmentMetricResultCollector::Stats(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count),
))
}
MetricAggregation::Max(MaxAggregation { .. }) => {
Ok(SegmentMetricResultCollector::Stats(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max),
))
}
MetricAggregation::Min(MinAggregation { .. }) => {
Ok(SegmentMetricResultCollector::Stats(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min),
))
}
MetricAggregation::Stats(StatsAggregation { .. }) => {
Ok(SegmentMetricResultCollector::Stats(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats),
))
}
MetricAggregation::Sum(SumAggregation { .. }) => {
Ok(SegmentMetricResultCollector::Stats(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum),
))
}
MetricAggregation::Average(AverageAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Average,
accessor_idx,
)),
),
MetricAggregation::Count(CountAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Count,
accessor_idx,
)),
),
MetricAggregation::Max(MaxAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Max,
accessor_idx,
)),
),
MetricAggregation::Min(MinAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Min,
accessor_idx,
)),
),
MetricAggregation::Stats(StatsAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Stats,
accessor_idx,
)),
),
MetricAggregation::Sum(SumAggregation { .. }) => Ok(
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(
req.field_type,
SegmentStatsType::Sum,
accessor_idx,
)),
),
}
}
pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) {
match self {
SegmentMetricResultCollector::Stats(stats_collector) => {
stats_collector.collect_block(doc, &metric.accessor);
stats_collector.collect_block_with_field(doc, &metric.accessor);
}
}
}
@@ -361,19 +389,37 @@ impl SegmentBucketResultCollector {
#[inline]
pub(crate) fn collect_block(
&mut self,
doc: &[DocId],
docs: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
force_flush: bool,
) -> crate::Result<()> {
match self {
SegmentBucketResultCollector::Range(range) => {
range.collect_block(doc, bucket_with_accessor, force_flush)?;
range.collect_block(docs, bucket_with_accessor)?;
}
SegmentBucketResultCollector::Histogram(histogram) => {
histogram.collect_block(doc, bucket_with_accessor, force_flush)?;
histogram.collect_block(docs, bucket_with_accessor)?;
}
SegmentBucketResultCollector::Terms(terms) => {
terms.collect_block(doc, bucket_with_accessor, force_flush)?;
terms.collect_block(docs, bucket_with_accessor)?;
}
}
Ok(())
}
#[inline]
pub(crate) fn flush(
&mut self,
bucket_with_accessor: &BucketAggregationWithAccessor,
) -> crate::Result<()> {
match self {
SegmentBucketResultCollector::Range(range) => {
range.flush(bucket_with_accessor)?;
}
SegmentBucketResultCollector::Histogram(histogram) => {
histogram.flush(bucket_with_accessor)?;
}
SegmentBucketResultCollector::Terms(terms) => {
terms.flush(bucket_with_accessor)?;
}
}
Ok(())