mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-08 01:52:54 +00:00
This introduce an optimization of top level term aggregation on field with a low cardinality. We then use a Vec as the underlying map. In addition, we buffer subaggregations. --------- Co-authored-by: Pascal Seitz <pascal.seitz@datadoghq.com> Co-authored-by: Paul Masurel <paul@quickwit.io>
88 lines
2.6 KiB
Rust
88 lines
2.6 KiB
Rust
use super::intermediate_agg_result::IntermediateAggregationResults;
|
|
use super::segment_agg_result::SegmentAggregationCollector;
|
|
use crate::aggregation::agg_data::AggregationsSegmentCtx;
|
|
use crate::DocId;
|
|
|
|
#[cfg(test)]
|
|
pub(crate) const DOC_BLOCK_SIZE: usize = 64;
|
|
|
|
#[cfg(not(test))]
|
|
pub(crate) const DOC_BLOCK_SIZE: usize = 256;
|
|
|
|
pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE];
|
|
|
|
/// BufAggregationCollector buffers documents before calling collect_block().
|
|
#[derive(Clone)]
|
|
pub(crate) struct BufAggregationCollector {
|
|
pub(crate) collector: Box<dyn SegmentAggregationCollector>,
|
|
staged_docs: DocBlock,
|
|
num_staged_docs: usize,
|
|
}
|
|
|
|
impl std::fmt::Debug for BufAggregationCollector {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
f.debug_struct("SegmentAggregationResultsCollector")
|
|
.field("staged_docs", &&self.staged_docs[..self.num_staged_docs])
|
|
.field("num_staged_docs", &self.num_staged_docs)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl BufAggregationCollector {
|
|
pub fn new(collector: Box<dyn SegmentAggregationCollector>) -> Self {
|
|
Self {
|
|
collector,
|
|
num_staged_docs: 0,
|
|
staged_docs: [0; DOC_BLOCK_SIZE],
|
|
}
|
|
}
|
|
}
|
|
|
|
impl SegmentAggregationCollector for BufAggregationCollector {
|
|
#[inline]
|
|
fn add_intermediate_aggregation_result(
|
|
self: Box<Self>,
|
|
agg_data: &AggregationsSegmentCtx,
|
|
results: &mut IntermediateAggregationResults,
|
|
) -> crate::Result<()> {
|
|
Box::new(self.collector).add_intermediate_aggregation_result(agg_data, results)
|
|
}
|
|
|
|
#[inline]
|
|
fn collect(
|
|
&mut self,
|
|
doc: crate::DocId,
|
|
agg_data: &mut AggregationsSegmentCtx,
|
|
) -> crate::Result<()> {
|
|
self.staged_docs[self.num_staged_docs] = doc;
|
|
self.num_staged_docs += 1;
|
|
if self.num_staged_docs == self.staged_docs.len() {
|
|
self.collector
|
|
.collect_block(&self.staged_docs[..self.num_staged_docs], agg_data)?;
|
|
self.num_staged_docs = 0;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[inline]
|
|
fn collect_block(
|
|
&mut self,
|
|
docs: &[crate::DocId],
|
|
agg_data: &mut AggregationsSegmentCtx,
|
|
) -> crate::Result<()> {
|
|
self.collector.collect_block(docs, agg_data)?;
|
|
Ok(())
|
|
}
|
|
|
|
#[inline]
|
|
fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
|
|
self.collector
|
|
.collect_block(&self.staged_docs[..self.num_staged_docs], agg_data)?;
|
|
self.num_staged_docs = 0;
|
|
|
|
self.collector.flush(agg_data)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|