diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index ae2e0d87c..92b9a5886 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -781,13 +781,20 @@ mod tests { use time::{Date, Month}; use crate::aggregation::agg_req::Aggregations; + use crate::aggregation::intermediate_agg_result::{ + IntermediateAggregationResult, IntermediateAggregationResults, + }; use crate::aggregation::tests::{ exec_request, exec_request_with_query, exec_request_with_query_and_memory_limit, get_test_index_from_terms, get_test_index_from_values_and_terms, }; - use crate::aggregation::AggregationLimitsGuard; + use crate::aggregation::{AggregationLimitsGuard, DistributedAggregationCollector}; use crate::indexer::NoMergePolicy; - use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING}; + use crate::query::AllQuery; + use crate::schema::{ + IndexRecordOption, IntoIpv6Addr, Schema, TextFieldIndexing, TextOptions, FAST, STORED, + STRING, TEXT, + }; use crate::{Index, IndexWriter}; #[test] @@ -2274,4 +2281,127 @@ mod tests { Ok(()) } + + #[test] + fn terms_aggs_hosts_and_tags_merge_on_mixed_order_request() -> crate::Result<()> { + // This test ensures that merging of aggregation results works correctly + // even if the order of the aggregation requests is different and + // running on different indexes with the same data. + let build_index = || -> crate::Result { + let mut schema_builder = Schema::builder(); + let host = schema_builder.add_text_field("host", FAST); + let tags = schema_builder.add_text_field("tags", FAST); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema.clone()); + let mut writer = index.writer(50_000_000).unwrap(); + + // --- Ingest documents (batch #1) --- + writer.add_document(doc!( + host => "192.168.0.10", + tags => "nice", + ))?; + writer.add_document(doc!( + host => "192.168.0.1", + tags => "nice", + ))?; + writer.add_document(doc!( + host => "192.168.0.11", + tags => "nice", + ))?; + writer.add_document(doc!( + host => "192.168.0.10", + tags => "nice", + tags => "cool", + ))?; + writer.add_document(doc!( + host => "192.168.0.1", + tags => "nice", + tags => "cool", + ))?; + + writer.commit()?; + + // --- Ingest documents (batch #2) --- + writer.add_document(doc!())?; + writer.add_document(doc!())?; + writer.add_document(doc!( + host => "192.168.0.10", + ))?; + writer.add_document(doc!( + host => "192.168.0.10", + ))?; + writer.add_document(doc!())?; + + writer.commit()?; + Ok(index) + }; + let index = build_index()?; + let index2 = build_index()?; + + let search = |idx: &Index, + agg_req: &Aggregations| + -> crate::Result { + let collector = DistributedAggregationCollector::from_aggs( + agg_req.clone(), + AggregationLimitsGuard::default(), + ); + let reader = idx.reader()?; + let searcher = reader.searcher(); + let agg_res = searcher.search(&AllQuery, &collector)?; + Ok(agg_res) + }; + + // --- Aggregations: terms on host and tags --- + let agg_req: Aggregations = serde_json::from_value(json!({ + "hosts": { "terms": { "field": "host" } }, + "tags": { "terms": { "field": "tags" } } + })) + .unwrap(); + + let mut agg_res = search(&index, &agg_req)?; + + // --- Aggregations: terms on host and tags --- + let mut agg_req2: Aggregations = Aggregations::with_capacity(20); + agg_req2.insert( + "tags".to_string(), + serde_json::from_value(json!({ "terms": { "field": "tags" } }))?, + ); + agg_req2.insert( + "hosts".to_string(), + serde_json::from_value(json!({ "terms": { "field": "host" } }))?, + ); + // make sure the order of the aggregation request is different + assert_ne!(agg_req.keys().next(), agg_req2.keys().next()); + + let agg_res2 = search(&index2, &agg_req2)?; + + agg_res.merge_fruits(agg_res2).unwrap(); + let agg_json = serde_json::to_value( + &agg_res.into_final_result(agg_req2, AggregationLimitsGuard::default())?, + )?; + + // hosts: + let hosts = &agg_json["hosts"]["buckets"]; + assert_eq!(hosts[0]["key"], "192.168.0.10"); + assert_eq!(hosts[0]["doc_count"], 8); + assert_eq!(hosts[1]["key"], "192.168.0.1"); + assert_eq!(hosts[1]["doc_count"], 4); + assert_eq!(hosts[2]["key"], "192.168.0.11"); + assert_eq!(hosts[2]["doc_count"], 2); + // Implementation currently reports error bounds/other count; ensure zero. + assert_eq!(agg_json["hosts"]["doc_count_error_upper_bound"], 0); + assert_eq!(agg_json["hosts"]["sum_other_doc_count"], 0); + + // tags: + let tags_buckets = &agg_json["tags"]["buckets"]; + assert_eq!(tags_buckets[0]["key"], "nice"); + assert_eq!(tags_buckets[0]["doc_count"], 10); + assert_eq!(tags_buckets[1]["key"], "cool"); + assert_eq!(tags_buckets[1]["doc_count"], 4); + assert_eq!(agg_json["tags"]["doc_count_error_upper_bound"], 0); + assert_eq!(agg_json["tags"]["sum_other_doc_count"], 0); + + Ok(()) + } } diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index f0309eafe..f5f373bb0 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -179,12 +179,17 @@ impl IntermediateAggregationResults { } /// Merge another intermediate aggregation result into this result. - /// - /// The order of the values need to be the same on both results. This is ensured when the same - /// (key values) are present on the underlying `VecWithNames` struct. - pub fn merge_fruits(&mut self, other: IntermediateAggregationResults) -> crate::Result<()> { - for (left, right) in self.aggs_res.values_mut().zip(other.aggs_res.into_values()) { - left.merge_fruits(right)?; + pub fn merge_fruits(&mut self, mut other: IntermediateAggregationResults) -> crate::Result<()> { + for (key, left) in self.aggs_res.iter_mut() { + if let Some(key) = other.aggs_res.remove(key) { + left.merge_fruits(key)?; + } + } + // Move remainder of other aggs_res into self. + // Note: Currently we don't expect this to happen, as we create empty intermediate results + // via [IntermediateAggregationResults::empty_from_req]. + for (key, value) in other.aggs_res { + self.aggs_res.insert(key, value); } Ok(()) }