mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 09:12:55 +00:00
reduce top hits aggregation memory consumption (#2426)
move request structure out of top hits aggregation collector and use from the passed structure instead full terms_many_with_top_hits Memory: 58.2 MB (-43.64%) Avg: 425.9680ms (-21.38%) Median: 415.1097ms (-23.56%) [395.5303ms .. 484.6325ms] dense terms_many_with_top_hits Memory: 58.2 MB (-43.64%) Avg: 440.0817ms (-19.68%) Median: 432.2286ms (-21.10%) [403.5632ms .. 497.7541ms] sparse terms_many_with_top_hits Memory: 13.1 MB (-49.31%) Avg: 33.3568ms (-32.19%) Median: 33.0834ms (-31.86%) [32.5126ms .. 35.7397ms] multivalue terms_many_with_top_hits Memory: 58.2 MB (-43.64%) Avg: 414.2340ms (-25.44%) Median: 413.4144ms (-25.64%) [403.9919ms .. 430.3170ms]
This commit is contained in:
@@ -203,6 +203,12 @@ impl AggregationVariants {
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
pub(crate) fn as_top_hits(&self) -> Option<&TopHitsAggregation> {
|
||||
match &self {
|
||||
AggregationVariants::TopHits(top_hits) => Some(top_hits),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn as_percentile(&self) -> Option<&PercentilesAggregationReq> {
|
||||
match &self {
|
||||
|
||||
@@ -225,7 +225,7 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
|
||||
IntermediateMetricResult::Percentiles(PercentilesCollector::default()),
|
||||
),
|
||||
TopHits(ref req) => IntermediateAggregationResult::Metric(
|
||||
IntermediateMetricResult::TopHits(TopHitsTopNComputer::new(req.clone())),
|
||||
IntermediateMetricResult::TopHits(TopHitsTopNComputer::new(req)),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,8 +229,7 @@ impl IntermediateExtendedStats {
|
||||
* self.intermediate_stats.count as f64
|
||||
* other.intermediate_stats.count as f64
|
||||
/ new_count as f64;
|
||||
self.mean = (self.intermediate_stats.sum as f64 + other.intermediate_stats.sum as f64)
|
||||
/ new_count as f64;
|
||||
self.mean = (self.intermediate_stats.sum + other.intermediate_stats.sum) / new_count as f64;
|
||||
self.sum_of_squares_elastic += other.sum_of_squares_elastic;
|
||||
self.delta_sum_for_squares_elastic += other.delta_sum_for_squares_elastic;
|
||||
self.intermediate_stats
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
use columnar::{ColumnarReader, DynamicColumn};
|
||||
use columnar::{Column, ColumnType, ColumnarReader, DynamicColumn};
|
||||
use common::json_path_writer::JSON_PATH_SEGMENT_SEP_STR;
|
||||
use common::DateTime;
|
||||
use regex::Regex;
|
||||
@@ -443,10 +443,10 @@ impl std::cmp::PartialEq for TopHitsTopNComputer {
|
||||
|
||||
impl TopHitsTopNComputer {
|
||||
/// Create a new TopHitsCollector
|
||||
pub fn new(req: TopHitsAggregation) -> Self {
|
||||
pub fn new(req: &TopHitsAggregation) -> Self {
|
||||
Self {
|
||||
top_n: TopNComputer::new(req.size + req.from.unwrap_or(0)),
|
||||
req,
|
||||
req: req.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -491,7 +491,6 @@ impl TopHitsTopNComputer {
|
||||
pub(crate) struct TopHitsSegmentCollector {
|
||||
segment_ordinal: SegmentOrdinal,
|
||||
accessor_idx: usize,
|
||||
req: TopHitsAggregation,
|
||||
top_n: TopNComputer<Vec<DocValueAndOrder>, DocAddress, false>,
|
||||
}
|
||||
|
||||
@@ -502,7 +501,6 @@ impl TopHitsSegmentCollector {
|
||||
segment_ordinal: SegmentOrdinal,
|
||||
) -> Self {
|
||||
Self {
|
||||
req: req.clone(),
|
||||
top_n: TopNComputer::new(req.size + req.from.unwrap_or(0)),
|
||||
segment_ordinal,
|
||||
accessor_idx,
|
||||
@@ -511,14 +509,13 @@ impl TopHitsSegmentCollector {
|
||||
fn into_top_hits_collector(
|
||||
self,
|
||||
value_accessors: &HashMap<String, Vec<DynamicColumn>>,
|
||||
req: &TopHitsAggregation,
|
||||
) -> TopHitsTopNComputer {
|
||||
let mut top_hits_computer = TopHitsTopNComputer::new(self.req.clone());
|
||||
let mut top_hits_computer = TopHitsTopNComputer::new(req);
|
||||
let top_results = self.top_n.into_vec();
|
||||
|
||||
for res in top_results {
|
||||
let doc_value_fields = self
|
||||
.req
|
||||
.get_document_field_data(value_accessors, res.doc.doc_id);
|
||||
let doc_value_fields = req.get_document_field_data(value_accessors, res.doc.doc_id);
|
||||
top_hits_computer.collect(
|
||||
DocSortValuesAndFields {
|
||||
sorts: res.feature,
|
||||
@@ -530,34 +527,15 @@ impl TopHitsSegmentCollector {
|
||||
|
||||
top_hits_computer
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentAggregationCollector for TopHitsSegmentCollector {
|
||||
fn add_intermediate_aggregation_result(
|
||||
self: Box<Self>,
|
||||
agg_with_accessor: &crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
|
||||
results: &mut crate::aggregation::intermediate_agg_result::IntermediateAggregationResults,
|
||||
) -> crate::Result<()> {
|
||||
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();
|
||||
|
||||
let value_accessors = &agg_with_accessor.aggs.values[self.accessor_idx].value_accessors;
|
||||
|
||||
let intermediate_result =
|
||||
IntermediateMetricResult::TopHits(self.into_top_hits_collector(value_accessors));
|
||||
results.push(
|
||||
name,
|
||||
IntermediateAggregationResult::Metric(intermediate_result),
|
||||
)
|
||||
}
|
||||
|
||||
fn collect(
|
||||
/// TODO add a specialized variant for a single sort field
|
||||
fn collect_with(
|
||||
&mut self,
|
||||
doc_id: crate::DocId,
|
||||
agg_with_accessor: &mut crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
|
||||
req: &TopHitsAggregation,
|
||||
accessors: &[(Column<u64>, ColumnType)],
|
||||
) -> crate::Result<()> {
|
||||
let accessors = &agg_with_accessor.aggs.values[self.accessor_idx].accessors;
|
||||
let sorts: Vec<DocValueAndOrder> = self
|
||||
.req
|
||||
let sorts: Vec<DocValueAndOrder> = req
|
||||
.sort
|
||||
.iter()
|
||||
.enumerate()
|
||||
@@ -582,15 +560,62 @@ impl SegmentAggregationCollector for TopHitsSegmentCollector {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentAggregationCollector for TopHitsSegmentCollector {
|
||||
fn add_intermediate_aggregation_result(
|
||||
self: Box<Self>,
|
||||
agg_with_accessor: &crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
|
||||
results: &mut crate::aggregation::intermediate_agg_result::IntermediateAggregationResults,
|
||||
) -> crate::Result<()> {
|
||||
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();
|
||||
|
||||
let value_accessors = &agg_with_accessor.aggs.values[self.accessor_idx].value_accessors;
|
||||
let tophits_req = &agg_with_accessor.aggs.values[self.accessor_idx]
|
||||
.agg
|
||||
.agg
|
||||
.as_top_hits()
|
||||
.expect("aggregation request must be of type top hits");
|
||||
|
||||
let intermediate_result = IntermediateMetricResult::TopHits(
|
||||
self.into_top_hits_collector(value_accessors, tophits_req),
|
||||
);
|
||||
results.push(
|
||||
name,
|
||||
IntermediateAggregationResult::Metric(intermediate_result),
|
||||
)
|
||||
}
|
||||
|
||||
/// TODO: Consider a caching layer to reduce the call overhead
|
||||
fn collect(
|
||||
&mut self,
|
||||
doc_id: crate::DocId,
|
||||
agg_with_accessor: &mut crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
|
||||
) -> crate::Result<()> {
|
||||
let tophits_req = &agg_with_accessor.aggs.values[self.accessor_idx]
|
||||
.agg
|
||||
.agg
|
||||
.as_top_hits()
|
||||
.expect("aggregation request must be of type top hits");
|
||||
let accessors = &agg_with_accessor.aggs.values[self.accessor_idx].accessors;
|
||||
self.collect_with(doc_id, tophits_req, accessors)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn collect_block(
|
||||
&mut self,
|
||||
docs: &[crate::DocId],
|
||||
agg_with_accessor: &mut crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
|
||||
) -> crate::Result<()> {
|
||||
let tophits_req = &agg_with_accessor.aggs.values[self.accessor_idx]
|
||||
.agg
|
||||
.agg
|
||||
.as_top_hits()
|
||||
.expect("aggregation request must be of type top hits");
|
||||
let accessors = &agg_with_accessor.aggs.values[self.accessor_idx].accessors;
|
||||
// TODO: Consider getting fields with the column block accessor.
|
||||
for doc in docs {
|
||||
self.collect(*doc, agg_with_accessor)?;
|
||||
self.collect_with(*doc, tophits_req, accessors)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user