diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 9c2bc0153..a3ed4c1fc 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -809,7 +809,7 @@ impl SegmentAggregationCollector docs: &[crate::DocId], agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { - let mem_pre = self.get_memory_consumption(); + // let mem_pre = self.get_memory_consumption(); let req_data = &mut self.terms_req_data; @@ -853,13 +853,16 @@ impl SegmentAggregationCollector } } - let mem_delta = self.get_memory_consumption() - mem_pre; - if mem_delta > 0 { - agg_data - .context - .limits - .add_memory_consumed(mem_delta as u64)?; - } + // let mem_delta = self.get_memory_consumption() - mem_pre; + // if mem_delta > 0 { + // agg_data + // .context + // .limits + // .add_memory_consumed(mem_delta as u64)?; + // } + + // After commenting out -> 6000ms -> 36ms + if let Some(sub_agg) = &mut self.sub_agg { sub_agg.check_flush_local(agg_data)?; } @@ -1225,6 +1228,7 @@ pub(crate) fn cut_off_buckets( mod tests { use std::net::IpAddr; use std::str::FromStr; + use std::time::Instant; use common::DateTime; use time::{Date, Month}; @@ -1238,8 +1242,9 @@ mod tests { get_test_index_from_terms, get_test_index_from_values_and_terms, }; use crate::aggregation::{AggregationLimitsGuard, DistributedAggregationCollector}; + use crate::collector::{Collector, default_collect_segment_impl}; use crate::indexer::NoMergePolicy; - use crate::query::AllQuery; + use crate::query::{AllQuery, EnableScoring, Query}; use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING}; use crate::{Index, IndexWriter}; @@ -2934,4 +2939,103 @@ mod tests { Ok(()) } + + #[test] + fn test_terms_double_nesting() { + let mut schema_builder = Schema::builder(); + let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST); + let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + + let outer_values = (0..10_000) + .map(|i| format!("outer_{i}")) + .collect::>(); + let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"]; + + { + let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 200_000_000).unwrap(); + for doc_id in 0..1_000_000u64 { + let outer_val = &outer_values[doc_id as usize % outer_values.len()]; + let inner_val = inner_values[doc_id as usize % inner_values.len()]; + index_writer.add_document(doc!( + outer_field => outer_val.as_str(), + inner_field => inner_val, + )).unwrap(); + } + index_writer.commit().unwrap(); + } + let agg_req: Aggregations = serde_json::from_value(json!({ + "outer": { + "terms": { "field": "outer_term", "size": 10 }, + "aggs": { + "inner": { + "terms": { "field": "inner_term" } + } + } + } + })) + .unwrap(); + + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + + let collector = + crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default()); + + assert_eq!(searcher.segment_readers().len(), 1); + let segment_reader = searcher.segment_reader(0u32); + let all_weight = AllQuery.weight(EnableScoring::disabled_from_schema(&schema)).unwrap(); + let mut segment_collector = collector.for_segment(0u32, segment_reader).unwrap(); + let start = Instant::now(); + default_collect_segment_impl(&mut segment_collector, &*all_weight, segment_reader, false).unwrap(); + dbg!(start.elapsed()); + } + + #[test] + fn test_terms_simple_nesting() { + let mut schema_builder = Schema::builder(); + let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST); + let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + + let outer_values = (0..10_000) + .map(|i| format!("outer_{i}")) + .collect::>(); + let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"]; + + { + let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 200_000_000).unwrap(); + for doc_id in 0..1_000_000u64 { + let outer_val = &outer_values[doc_id as usize % outer_values.len()]; + let inner_val = inner_values[doc_id as usize % inner_values.len()]; + index_writer.add_document(doc!( + outer_field => outer_val.as_str(), + inner_field => inner_val, + )).unwrap(); + } + index_writer.commit().unwrap(); + } + let agg_req: Aggregations = serde_json::from_value(json!({ + "outer": { + "terms": { "field": "outer_term", "size": 10 }, + } + })) + .unwrap(); + + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + + let collector = + crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default()); + + assert_eq!(searcher.segment_readers().len(), 1); + let segment_reader = searcher.segment_reader(0u32); + let all_weight = AllQuery.weight(EnableScoring::disabled_from_schema(&schema)).unwrap(); + let mut segment_collector = collector.for_segment(0u32, segment_reader).unwrap(); + let start = Instant::now(); + default_collect_segment_impl(&mut segment_collector, &*all_weight, segment_reader, false).unwrap(); + dbg!(start.elapsed()); + } }