Fix min doc_count empty merge bug (#2057)

This fixes an issue when min_doc==0 loads terms from the dictionary from
one segment and merges the same term with a subaggregation from another
segment.
Previously the empty structure was not correctly initialized to contain
the subaggregation so the merge was incorrect.
This commit is contained in:
PSeitz
2023-05-29 14:20:50 +08:00
committed by GitHub
parent e56addc63e
commit 3af456972e

View File

@@ -554,6 +554,9 @@ impl SegmentTermCollector {
if self.req.min_doc_count == 0 {
// TODO: Handle rev streaming for descending sorting by keys
let mut stream = term_dict.dictionary().stream()?;
let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req(
agg_with_accessor.agg.sub_aggregation(),
);
while let Some((key, _ord)) = stream.next() {
if dict.len() >= self.req.segment_size as usize {
break;
@@ -564,7 +567,12 @@ impl SegmentTermCollector {
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?
.to_string(),
);
dict.entry(key).or_default();
dict.entry(key.clone())
.or_insert_with(|| IntermediateTermBucketEntry {
doc_count: 0,
sub_aggregation: empty_sub_aggregation.clone(),
});
}
}
} else {
@@ -625,6 +633,9 @@ mod tests {
get_test_index_from_terms, get_test_index_from_values_and_terms,
};
use crate::aggregation::AggregationLimits;
use crate::indexer::NoMergePolicy;
use crate::schema::{Schema, FAST, STRING};
use crate::Index;
#[test]
fn terms_aggregation_test_single_segment() -> crate::Result<()> {
@@ -1215,6 +1226,85 @@ mod tests {
Ok(())
}
#[test]
fn terms_aggregation_min_doc_count_special_case_with_sub_agg_empty_merge() -> crate::Result<()>
{
let mut schema_builder = Schema::builder();
let string_field_1 = schema_builder.add_text_field("string1", STRING | FAST);
let string_field_2 = schema_builder.add_text_field("string2", STRING | FAST);
let score_fieldtype = crate::schema::NumericOptions::default().set_fast();
let score_field = schema_builder.add_u64_field("score", score_fieldtype);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
// writing the segment
index_writer.add_document(doc!(
string_field_1 => "A".to_string(),
string_field_2 => "hit".to_string(),
score_field => 1u64,
))?;
index_writer.add_document(doc!(
string_field_1 => "B".to_string(),
string_field_2 => "nohit".to_string(), // this doc gets filtered in this segment,
// but the term will still be loaded because
// min_doc_count == 0
score_field => 2u64,
))?;
index_writer.commit()?;
index_writer.add_document(doc!(
string_field_1 => "A".to_string(),
string_field_2 => "hit".to_string(),
score_field => 2u64,
))?;
index_writer.add_document(doc!(
string_field_1 => "B".to_string(),
string_field_2 => "hit".to_string(),
score_field => 4u64,
))?;
index_writer.commit()?;
}
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string1",
"min_doc_count": 0,
},
"aggs":{
"elhistogram": {
"histogram": {
"field": "score",
"interval": 1
}
}
}
}
}))
.unwrap();
// searching for terma, but min_doc_count will return all terms
let res = exec_request_with_query(agg_req, &index, Some(("string2", "hit")))?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 2);
assert_eq!(
res["my_texts"]["buckets"][0]["elhistogram"]["buckets"],
json!([{ "doc_count": 1, "key": 1.0 }, { "doc_count": 1, "key": 2.0 } ])
);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 1);
assert_eq!(
res["my_texts"]["buckets"][1]["elhistogram"]["buckets"],
json!([ { "doc_count": 1, "key": 4.0 } ])
);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
Ok(())
}
#[test]
fn terms_aggregation_error_count_test() -> crate::Result<()> {
let terms_per_segment = vec![