Test demonstrating that nested aggregation are insanely slow

This commit is contained in:
Paul Masurel
2026-04-20 18:23:44 +02:00
parent 13d74c3c20
commit d5e2709b1b

View File

@@ -809,7 +809,7 @@ impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentAggregationCollector
docs: &[crate::DocId],
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let mem_pre = self.get_memory_consumption();
// let mem_pre = self.get_memory_consumption();
let req_data = &mut self.terms_req_data;
@@ -853,13 +853,16 @@ impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentAggregationCollector
}
}
let mem_delta = self.get_memory_consumption() - mem_pre;
if mem_delta > 0 {
agg_data
.context
.limits
.add_memory_consumed(mem_delta as u64)?;
}
// let mem_delta = self.get_memory_consumption() - mem_pre;
// if mem_delta > 0 {
// agg_data
// .context
// .limits
// .add_memory_consumed(mem_delta as u64)?;
// }
// After commenting out -> 6000ms -> 36ms
if let Some(sub_agg) = &mut self.sub_agg {
sub_agg.check_flush_local(agg_data)?;
}
@@ -1225,6 +1228,7 @@ pub(crate) fn cut_off_buckets<T: GetDocCount + Debug>(
mod tests {
use std::net::IpAddr;
use std::str::FromStr;
use std::time::Instant;
use common::DateTime;
use time::{Date, Month};
@@ -1238,8 +1242,9 @@ mod tests {
get_test_index_from_terms, get_test_index_from_values_and_terms,
};
use crate::aggregation::{AggregationLimitsGuard, DistributedAggregationCollector};
use crate::collector::{Collector, default_collect_segment_impl};
use crate::indexer::NoMergePolicy;
use crate::query::AllQuery;
use crate::query::{AllQuery, EnableScoring, Query};
use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
use crate::{Index, IndexWriter};
@@ -2934,4 +2939,103 @@ mod tests {
Ok(())
}
#[test]
fn test_terms_double_nesting() {
let mut schema_builder = Schema::builder();
let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST);
let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let outer_values = (0..10_000)
.map(|i| format!("outer_{i}"))
.collect::<Vec<_>>();
let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"];
{
let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 200_000_000).unwrap();
for doc_id in 0..1_000_000u64 {
let outer_val = &outer_values[doc_id as usize % outer_values.len()];
let inner_val = inner_values[doc_id as usize % inner_values.len()];
index_writer.add_document(doc!(
outer_field => outer_val.as_str(),
inner_field => inner_val,
)).unwrap();
}
index_writer.commit().unwrap();
}
let agg_req: Aggregations = serde_json::from_value(json!({
"outer": {
"terms": { "field": "outer_term", "size": 10 },
"aggs": {
"inner": {
"terms": { "field": "inner_term" }
}
}
}
}))
.unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let collector =
crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default());
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0u32);
let all_weight = AllQuery.weight(EnableScoring::disabled_from_schema(&schema)).unwrap();
let mut segment_collector = collector.for_segment(0u32, segment_reader).unwrap();
let start = Instant::now();
default_collect_segment_impl(&mut segment_collector, &*all_weight, segment_reader, false).unwrap();
dbg!(start.elapsed());
}
#[test]
fn test_terms_simple_nesting() {
let mut schema_builder = Schema::builder();
let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST);
let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let outer_values = (0..10_000)
.map(|i| format!("outer_{i}"))
.collect::<Vec<_>>();
let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"];
{
let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 200_000_000).unwrap();
for doc_id in 0..1_000_000u64 {
let outer_val = &outer_values[doc_id as usize % outer_values.len()];
let inner_val = inner_values[doc_id as usize % inner_values.len()];
index_writer.add_document(doc!(
outer_field => outer_val.as_str(),
inner_field => inner_val,
)).unwrap();
}
index_writer.commit().unwrap();
}
let agg_req: Aggregations = serde_json::from_value(json!({
"outer": {
"terms": { "field": "outer_term", "size": 10 },
}
}))
.unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let collector =
crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default());
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0u32);
let all_weight = AllQuery.weight(EnableScoring::disabled_from_schema(&schema)).unwrap();
let mut segment_collector = collector.for_segment(0u32, segment_reader).unwrap();
let start = Instant::now();
default_collect_segment_impl(&mut segment_collector, &*all_weight, segment_reader, false).unwrap();
dbg!(start.elapsed());
}
}