move buffer in front of dynamic dispatch (#1915)

dynamic dispatch seems to be really expensive, move the buffer in front of the dynamic dispatch, to reduce the number of calls into the dynamic dispatched collector.
This commit is contained in:
PSeitz
2023-02-28 13:07:50 +08:00
committed by GitHub
parent 8a71e00da3
commit bc36458334
7 changed files with 46 additions and 62 deletions

View File

@@ -51,6 +51,20 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
/// May panic if `idx` is greater than the column length.
fn get_val(&self, idx: u32) -> T;
/// Allows to push down multiple fetch calls, to avoid dynamic dispatch overhead.
///
/// idx and output should have the same length
///
/// # Panics
///
/// May panic if `idx` is greater than the column length.
fn get_vals(&self, idx: &[u32], output: &mut [T]) {
assert!(idx.len() == output.len());
for (out, idx) in output.iter_mut().zip(idx.iter()) {
*out = self.get_val(*idx as u32);
}
}
/// Fills an output buffer with the fast field values
/// associated with the `DocId` going from
/// `start` to `start + output.len()`.

View File

@@ -322,7 +322,7 @@ impl SegmentHistogramCollector {
let sub_aggregation_blueprint = if sub_aggregation.is_empty() {
None
} else {
let sub_aggregation = build_segment_agg_collector(sub_aggregation, false)?;
let sub_aggregation = build_segment_agg_collector(sub_aggregation)?;
Some(sub_aggregation)
};

View File

@@ -286,7 +286,7 @@ impl SegmentRangeCollector {
let sub_aggregation = if sub_aggregation.is_empty() {
None
} else {
Some(build_segment_agg_collector(sub_aggregation, false)?)
Some(build_segment_agg_collector(sub_aggregation)?)
};
Ok(SegmentRangeAndBucketEntry {

View File

@@ -374,7 +374,7 @@ impl SegmentTermCollector {
let has_sub_aggregations = !sub_aggregations.is_empty();
let blueprint = if has_sub_aggregations {
let sub_aggregation = build_segment_agg_collector(sub_aggregations, false)?;
let sub_aggregation = build_segment_agg_collector(sub_aggregations)?;
Some(sub_aggregation)
} else {
None

View File

@@ -8,13 +8,13 @@ pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE];
/// BufAggregationCollector buffers documents before calling collect_block().
#[derive(Clone)]
pub(crate) struct BufAggregationCollector<T> {
pub(crate) collector: T,
pub(crate) struct BufAggregationCollector {
pub(crate) collector: Box<dyn SegmentAggregationCollector>,
staged_docs: DocBlock,
num_staged_docs: usize,
}
impl<T> std::fmt::Debug for BufAggregationCollector<T> {
impl std::fmt::Debug for BufAggregationCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SegmentAggregationResultsCollector")
.field("staged_docs", &&self.staged_docs[..self.num_staged_docs])
@@ -23,8 +23,8 @@ impl<T> std::fmt::Debug for BufAggregationCollector<T> {
}
}
impl<T: SegmentAggregationCollector> BufAggregationCollector<T> {
pub fn new(collector: T) -> Self {
impl BufAggregationCollector {
pub fn new(collector: Box<dyn SegmentAggregationCollector>) -> Self {
Self {
collector,
num_staged_docs: 0,
@@ -33,9 +33,7 @@ impl<T: SegmentAggregationCollector> BufAggregationCollector<T> {
}
}
impl<T: SegmentAggregationCollector + Clone + 'static> SegmentAggregationCollector
for BufAggregationCollector<T>
{
impl SegmentAggregationCollector for BufAggregationCollector {
fn into_intermediate_aggregations_result(
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,

View File

@@ -3,6 +3,7 @@ use std::rc::Rc;
use super::agg_req::Aggregations;
use super::agg_req_with_accessor::AggregationsWithAccessor;
use super::agg_result::AggregationResults;
use super::buf_collector::BufAggregationCollector;
use super::intermediate_agg_result::IntermediateAggregationResults;
use super::segment_agg_result::{build_segment_agg_collector, SegmentAggregationCollector};
use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate;
@@ -134,7 +135,7 @@ fn merge_fruits(
/// `AggregationSegmentCollector` does the aggregation collection on a segment.
pub struct AggregationSegmentCollector {
aggs_with_accessor: AggregationsWithAccessor,
result: Box<dyn SegmentAggregationCollector>,
result: BufAggregationCollector,
error: Option<TantivyError>,
}
@@ -148,7 +149,8 @@ impl AggregationSegmentCollector {
) -> crate::Result<Self> {
let aggs_with_accessor =
get_aggs_with_accessor_and_validate(agg, reader, Rc::default(), max_bucket_count)?;
let result = build_segment_agg_collector(&aggs_with_accessor, true)?;
let result =
BufAggregationCollector::new(build_segment_agg_collector(&aggs_with_accessor)?);
Ok(AggregationSegmentCollector {
aggs_with_accessor,
result,
@@ -175,7 +177,6 @@ impl SegmentCollector for AggregationSegmentCollector {
return Err(err);
}
self.result.flush(&self.aggs_with_accessor)?;
self.result
.into_intermediate_aggregations_result(&self.aggs_with_accessor)
Box::new(self.result).into_intermediate_aggregations_result(&self.aggs_with_accessor)
}
}

View File

@@ -12,7 +12,6 @@ use super::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor, MetricAggregationWithAccessor,
};
use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTermCollector};
use super::buf_collector::BufAggregationCollector;
use super::collector::MAX_BUCKET_COUNT;
use super::intermediate_agg_result::IntermediateAggregationResults;
use super::metric::{
@@ -68,35 +67,28 @@ impl Clone for Box<dyn SegmentAggregationCollector> {
pub(crate) fn build_segment_agg_collector(
req: &AggregationsWithAccessor,
add_buffer_layer: bool,
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
// Single metric special case
if req.buckets.is_empty() && req.metrics.len() == 1 {
let req = &req.metrics.values[0];
let accessor_idx = 0;
return build_metric_segment_agg_collector(req, accessor_idx, add_buffer_layer);
return build_metric_segment_agg_collector(req, accessor_idx);
}
// Single bucket special case
if req.metrics.is_empty() && req.buckets.len() == 1 {
let req = &req.buckets.values[0];
let accessor_idx = 0;
return build_bucket_segment_agg_collector(req, accessor_idx, add_buffer_layer);
return build_bucket_segment_agg_collector(req, accessor_idx);
}
let agg = GenericSegmentAggregationResultsCollector::from_req_and_validate(req)?;
if add_buffer_layer {
let agg = BufAggregationCollector::new(agg);
Ok(Box::new(agg))
} else {
Ok(Box::new(agg))
}
Ok(Box::new(agg))
}
pub(crate) fn build_metric_segment_agg_collector(
req: &MetricAggregationWithAccessor,
accessor_idx: usize,
add_buffer_layer: bool,
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
let stats_collector = match &req.metric {
MetricAggregation::Average(AverageAggregation { .. }) => {
@@ -119,60 +111,39 @@ pub(crate) fn build_metric_segment_agg_collector(
}
};
if add_buffer_layer {
let stats_collector = BufAggregationCollector::new(stats_collector);
Ok(Box::new(stats_collector))
} else {
Ok(Box::new(stats_collector))
}
}
fn box_with_opt_buffer<T: SegmentAggregationCollector + Clone + 'static>(
add_buffer_layer: bool,
collector: T,
) -> Box<dyn SegmentAggregationCollector> {
if add_buffer_layer {
let collector = BufAggregationCollector::new(collector);
Box::new(collector)
} else {
Box::new(collector)
}
Ok(Box::new(stats_collector))
}
pub(crate) fn build_bucket_segment_agg_collector(
req: &BucketAggregationWithAccessor,
accessor_idx: usize,
add_buffer_layer: bool,
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
match &req.bucket_agg {
BucketAggregationType::Terms(terms_req) => Ok(box_with_opt_buffer(
add_buffer_layer,
SegmentTermCollector::from_req_and_validate(
BucketAggregationType::Terms(terms_req) => {
Ok(Box::new(SegmentTermCollector::from_req_and_validate(
terms_req,
&req.sub_aggregation,
req.field_type,
accessor_idx,
)?,
)),
BucketAggregationType::Range(range_req) => Ok(box_with_opt_buffer(
add_buffer_layer,
SegmentRangeCollector::from_req_and_validate(
)?))
}
BucketAggregationType::Range(range_req) => {
Ok(Box::new(SegmentRangeCollector::from_req_and_validate(
range_req,
&req.sub_aggregation,
&req.bucket_count,
req.field_type,
accessor_idx,
)?,
)),
BucketAggregationType::Histogram(histogram) => Ok(box_with_opt_buffer(
add_buffer_layer,
SegmentHistogramCollector::from_req_and_validate(
)?))
}
BucketAggregationType::Histogram(histogram) => {
Ok(Box::new(SegmentHistogramCollector::from_req_and_validate(
histogram,
&req.sub_aggregation,
req.field_type,
accessor_idx,
)?,
)),
)?))
}
}
}
@@ -279,7 +250,7 @@ impl GenericSegmentAggregationResultsCollector {
.iter()
.enumerate()
.map(|(accessor_idx, (_key, req))| {
build_bucket_segment_agg_collector(req, accessor_idx, false)
build_bucket_segment_agg_collector(req, accessor_idx)
})
.collect::<crate::Result<Vec<Box<dyn SegmentAggregationCollector>>>>()?;
let metrics = req
@@ -287,7 +258,7 @@ impl GenericSegmentAggregationResultsCollector {
.iter()
.enumerate()
.map(|(accessor_idx, (_key, req))| {
build_metric_segment_agg_collector(req, accessor_idx, false)
build_metric_segment_agg_collector(req, accessor_idx)
})
.collect::<crate::Result<Vec<Box<dyn SegmentAggregationCollector>>>>()?;