From bc36458334d83dae5ce5cf541a9d7d92c0c1ae22 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Tue, 28 Feb 2023 13:07:50 +0800 Subject: [PATCH] move buffer in front of dynamic dispatch (#1915) dynamic dispatch seems to be really expensive, move the buffer in front of the dynamic dispatch, to reduce the number of calls into the dynamic dispatched collector. --- columnar/src/column_values/mod.rs | 14 ++++ src/aggregation/bucket/histogram/histogram.rs | 2 +- src/aggregation/bucket/range.rs | 2 +- src/aggregation/bucket/term_agg.rs | 2 +- src/aggregation/buf_collector.rs | 14 ++-- src/aggregation/collector.rs | 9 +-- src/aggregation/segment_agg_result.rs | 65 +++++-------------- 7 files changed, 46 insertions(+), 62 deletions(-) diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index d761235c0..57d6bac51 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -51,6 +51,20 @@ pub trait ColumnValues: Send + Sync { /// May panic if `idx` is greater than the column length. fn get_val(&self, idx: u32) -> T; + /// Allows to push down multiple fetch calls, to avoid dynamic dispatch overhead. + /// + /// idx and output should have the same length + /// + /// # Panics + /// + /// May panic if `idx` is greater than the column length. + fn get_vals(&self, idx: &[u32], output: &mut [T]) { + assert!(idx.len() == output.len()); + for (out, idx) in output.iter_mut().zip(idx.iter()) { + *out = self.get_val(*idx as u32); + } + } + /// Fills an output buffer with the fast field values /// associated with the `DocId` going from /// `start` to `start + output.len()`. diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 76ef3d41b..3af15248a 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -322,7 +322,7 @@ impl SegmentHistogramCollector { let sub_aggregation_blueprint = if sub_aggregation.is_empty() { None } else { - let sub_aggregation = build_segment_agg_collector(sub_aggregation, false)?; + let sub_aggregation = build_segment_agg_collector(sub_aggregation)?; Some(sub_aggregation) }; diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 85c36dcee..674e3f22b 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -286,7 +286,7 @@ impl SegmentRangeCollector { let sub_aggregation = if sub_aggregation.is_empty() { None } else { - Some(build_segment_agg_collector(sub_aggregation, false)?) + Some(build_segment_agg_collector(sub_aggregation)?) }; Ok(SegmentRangeAndBucketEntry { diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index f7f991a4c..6abb75c4b 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -374,7 +374,7 @@ impl SegmentTermCollector { let has_sub_aggregations = !sub_aggregations.is_empty(); let blueprint = if has_sub_aggregations { - let sub_aggregation = build_segment_agg_collector(sub_aggregations, false)?; + let sub_aggregation = build_segment_agg_collector(sub_aggregations)?; Some(sub_aggregation) } else { None diff --git a/src/aggregation/buf_collector.rs b/src/aggregation/buf_collector.rs index 149707f82..ead6183ae 100644 --- a/src/aggregation/buf_collector.rs +++ b/src/aggregation/buf_collector.rs @@ -8,13 +8,13 @@ pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE]; /// BufAggregationCollector buffers documents before calling collect_block(). #[derive(Clone)] -pub(crate) struct BufAggregationCollector { - pub(crate) collector: T, +pub(crate) struct BufAggregationCollector { + pub(crate) collector: Box, staged_docs: DocBlock, num_staged_docs: usize, } -impl std::fmt::Debug for BufAggregationCollector { +impl std::fmt::Debug for BufAggregationCollector { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SegmentAggregationResultsCollector") .field("staged_docs", &&self.staged_docs[..self.num_staged_docs]) @@ -23,8 +23,8 @@ impl std::fmt::Debug for BufAggregationCollector { } } -impl BufAggregationCollector { - pub fn new(collector: T) -> Self { +impl BufAggregationCollector { + pub fn new(collector: Box) -> Self { Self { collector, num_staged_docs: 0, @@ -33,9 +33,7 @@ impl BufAggregationCollector { } } -impl SegmentAggregationCollector - for BufAggregationCollector -{ +impl SegmentAggregationCollector for BufAggregationCollector { fn into_intermediate_aggregations_result( self: Box, agg_with_accessor: &AggregationsWithAccessor, diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index 852b2053b..b00c03690 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -3,6 +3,7 @@ use std::rc::Rc; use super::agg_req::Aggregations; use super::agg_req_with_accessor::AggregationsWithAccessor; use super::agg_result::AggregationResults; +use super::buf_collector::BufAggregationCollector; use super::intermediate_agg_result::IntermediateAggregationResults; use super::segment_agg_result::{build_segment_agg_collector, SegmentAggregationCollector}; use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate; @@ -134,7 +135,7 @@ fn merge_fruits( /// `AggregationSegmentCollector` does the aggregation collection on a segment. pub struct AggregationSegmentCollector { aggs_with_accessor: AggregationsWithAccessor, - result: Box, + result: BufAggregationCollector, error: Option, } @@ -148,7 +149,8 @@ impl AggregationSegmentCollector { ) -> crate::Result { let aggs_with_accessor = get_aggs_with_accessor_and_validate(agg, reader, Rc::default(), max_bucket_count)?; - let result = build_segment_agg_collector(&aggs_with_accessor, true)?; + let result = + BufAggregationCollector::new(build_segment_agg_collector(&aggs_with_accessor)?); Ok(AggregationSegmentCollector { aggs_with_accessor, result, @@ -175,7 +177,6 @@ impl SegmentCollector for AggregationSegmentCollector { return Err(err); } self.result.flush(&self.aggs_with_accessor)?; - self.result - .into_intermediate_aggregations_result(&self.aggs_with_accessor) + Box::new(self.result).into_intermediate_aggregations_result(&self.aggs_with_accessor) } } diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index d88c6f8a7..b6e7fe2f3 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -12,7 +12,6 @@ use super::agg_req_with_accessor::{ AggregationsWithAccessor, BucketAggregationWithAccessor, MetricAggregationWithAccessor, }; use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTermCollector}; -use super::buf_collector::BufAggregationCollector; use super::collector::MAX_BUCKET_COUNT; use super::intermediate_agg_result::IntermediateAggregationResults; use super::metric::{ @@ -68,35 +67,28 @@ impl Clone for Box { pub(crate) fn build_segment_agg_collector( req: &AggregationsWithAccessor, - add_buffer_layer: bool, ) -> crate::Result> { // Single metric special case if req.buckets.is_empty() && req.metrics.len() == 1 { let req = &req.metrics.values[0]; let accessor_idx = 0; - return build_metric_segment_agg_collector(req, accessor_idx, add_buffer_layer); + return build_metric_segment_agg_collector(req, accessor_idx); } // Single bucket special case if req.metrics.is_empty() && req.buckets.len() == 1 { let req = &req.buckets.values[0]; let accessor_idx = 0; - return build_bucket_segment_agg_collector(req, accessor_idx, add_buffer_layer); + return build_bucket_segment_agg_collector(req, accessor_idx); } let agg = GenericSegmentAggregationResultsCollector::from_req_and_validate(req)?; - if add_buffer_layer { - let agg = BufAggregationCollector::new(agg); - Ok(Box::new(agg)) - } else { - Ok(Box::new(agg)) - } + Ok(Box::new(agg)) } pub(crate) fn build_metric_segment_agg_collector( req: &MetricAggregationWithAccessor, accessor_idx: usize, - add_buffer_layer: bool, ) -> crate::Result> { let stats_collector = match &req.metric { MetricAggregation::Average(AverageAggregation { .. }) => { @@ -119,60 +111,39 @@ pub(crate) fn build_metric_segment_agg_collector( } }; - if add_buffer_layer { - let stats_collector = BufAggregationCollector::new(stats_collector); - Ok(Box::new(stats_collector)) - } else { - Ok(Box::new(stats_collector)) - } -} - -fn box_with_opt_buffer( - add_buffer_layer: bool, - collector: T, -) -> Box { - if add_buffer_layer { - let collector = BufAggregationCollector::new(collector); - Box::new(collector) - } else { - Box::new(collector) - } + Ok(Box::new(stats_collector)) } pub(crate) fn build_bucket_segment_agg_collector( req: &BucketAggregationWithAccessor, accessor_idx: usize, - add_buffer_layer: bool, ) -> crate::Result> { match &req.bucket_agg { - BucketAggregationType::Terms(terms_req) => Ok(box_with_opt_buffer( - add_buffer_layer, - SegmentTermCollector::from_req_and_validate( + BucketAggregationType::Terms(terms_req) => { + Ok(Box::new(SegmentTermCollector::from_req_and_validate( terms_req, &req.sub_aggregation, req.field_type, accessor_idx, - )?, - )), - BucketAggregationType::Range(range_req) => Ok(box_with_opt_buffer( - add_buffer_layer, - SegmentRangeCollector::from_req_and_validate( + )?)) + } + BucketAggregationType::Range(range_req) => { + Ok(Box::new(SegmentRangeCollector::from_req_and_validate( range_req, &req.sub_aggregation, &req.bucket_count, req.field_type, accessor_idx, - )?, - )), - BucketAggregationType::Histogram(histogram) => Ok(box_with_opt_buffer( - add_buffer_layer, - SegmentHistogramCollector::from_req_and_validate( + )?)) + } + BucketAggregationType::Histogram(histogram) => { + Ok(Box::new(SegmentHistogramCollector::from_req_and_validate( histogram, &req.sub_aggregation, req.field_type, accessor_idx, - )?, - )), + )?)) + } } } @@ -279,7 +250,7 @@ impl GenericSegmentAggregationResultsCollector { .iter() .enumerate() .map(|(accessor_idx, (_key, req))| { - build_bucket_segment_agg_collector(req, accessor_idx, false) + build_bucket_segment_agg_collector(req, accessor_idx) }) .collect::>>>()?; let metrics = req @@ -287,7 +258,7 @@ impl GenericSegmentAggregationResultsCollector { .iter() .enumerate() .map(|(accessor_idx, (_key, req))| { - build_metric_segment_agg_collector(req, accessor_idx, false) + build_metric_segment_agg_collector(req, accessor_idx) }) .collect::>>>()?;