mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-23 02:29:57 +00:00
* add nested histogram-termagg benchmark * Replace AggregationsWithAccessor with AggData With AggregationsWithAccessor pre-computation and caching was done on the collector level. If you have 10000 sub collectors (e.g. a term aggregation with sub aggregations) this is very inefficient. `AggData` instead moves the data from the collector to a node which reflects the cardinality of the request tree instead of the cardinality of the segment collector. It also moves the global struct shared with all aggregations in to aggregation specific structs. So each aggregation has its own space to store cached data and aggregation specific information. This also breaks up the dependency to the elastic search aggregation structure somewhat. Due to lifetime issues, we move the agg request specific object out of `AggData` during the collection and move it back at the end (for now). That's some unnecessary work, which costs CPU. This allows better caching and will also pave the way for another potential optimization, by separating the collector and its storage. Currently we allocate a new collector for each sub aggregation bucket (for nested aggregations), but ideally we would have just one collector instance. * renames * move request data to agg request files --------- Co-authored-by: Pascal Seitz <pascal.seitz@datadoghq.com>
116 lines
3.6 KiB
Rust
116 lines
3.6 KiB
Rust
//! Contains aggregation trees which is used during collection in a segment.
|
|
//! This tree contains datastructrues optimized for fast collection.
|
|
//! The tree can be converted to an intermediate tree, which contains datastructrues optimized for
|
|
//! merging.
|
|
|
|
use std::fmt::Debug;
|
|
|
|
pub(crate) use super::agg_limits::AggregationLimitsGuard;
|
|
use super::intermediate_agg_result::IntermediateAggregationResults;
|
|
use crate::aggregation::agg_data::AggregationsSegmentCtx;
|
|
|
|
/// A SegmentAggregationCollector is used to collect aggregation results.
|
|
pub trait SegmentAggregationCollector: CollectorClone + Debug {
|
|
fn add_intermediate_aggregation_result(
|
|
self: Box<Self>,
|
|
agg_data: &AggregationsSegmentCtx,
|
|
results: &mut IntermediateAggregationResults,
|
|
) -> crate::Result<()>;
|
|
|
|
fn collect(
|
|
&mut self,
|
|
doc: crate::DocId,
|
|
agg_data: &mut AggregationsSegmentCtx,
|
|
) -> crate::Result<()>;
|
|
|
|
fn collect_block(
|
|
&mut self,
|
|
docs: &[crate::DocId],
|
|
agg_data: &mut AggregationsSegmentCtx,
|
|
) -> crate::Result<()>;
|
|
|
|
/// Finalize method. Some Aggregator collect blocks of docs before calling `collect_block`.
|
|
/// This method ensures those staged docs will be collected.
|
|
fn flush(&mut self, _agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// A helper trait to enable cloning of Box<dyn SegmentAggregationCollector>
|
|
pub trait CollectorClone {
|
|
fn clone_box(&self) -> Box<dyn SegmentAggregationCollector>;
|
|
}
|
|
|
|
impl<T> CollectorClone for T
|
|
where T: 'static + SegmentAggregationCollector + Clone
|
|
{
|
|
fn clone_box(&self) -> Box<dyn SegmentAggregationCollector> {
|
|
Box::new(self.clone())
|
|
}
|
|
}
|
|
|
|
impl Clone for Box<dyn SegmentAggregationCollector> {
|
|
fn clone(&self) -> Box<dyn SegmentAggregationCollector> {
|
|
self.clone_box()
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Default)]
|
|
/// The GenericSegmentAggregationResultsCollector is the generic version of the collector, which
|
|
/// can handle arbitrary complexity of sub-aggregations. Ideally we never have to pick this one
|
|
/// and can provide specialized versions instead, that remove some of its overhead.
|
|
pub(crate) struct GenericSegmentAggregationResultsCollector {
|
|
pub(crate) aggs: Vec<Box<dyn SegmentAggregationCollector>>,
|
|
}
|
|
|
|
impl Debug for GenericSegmentAggregationResultsCollector {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
f.debug_struct("SegmentAggregationResultsCollector")
|
|
.field("aggs", &self.aggs)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
|
|
fn add_intermediate_aggregation_result(
|
|
self: Box<Self>,
|
|
agg_data: &AggregationsSegmentCtx,
|
|
results: &mut IntermediateAggregationResults,
|
|
) -> crate::Result<()> {
|
|
for agg in self.aggs {
|
|
agg.add_intermediate_aggregation_result(agg_data, results)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn collect(
|
|
&mut self,
|
|
doc: crate::DocId,
|
|
agg_data: &mut AggregationsSegmentCtx,
|
|
) -> crate::Result<()> {
|
|
self.collect_block(&[doc], agg_data)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn collect_block(
|
|
&mut self,
|
|
docs: &[crate::DocId],
|
|
agg_data: &mut AggregationsSegmentCtx,
|
|
) -> crate::Result<()> {
|
|
for collector in &mut self.aggs {
|
|
collector.collect_block(docs, agg_data)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
|
|
for collector in &mut self.aggs {
|
|
collector.flush(agg_data)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|