Compare commits

...

1 Commits

Author SHA1 Message Date
Pascal Seitz
1b09dda479 improve bucket size aggregation limit
postpone bucket size aggregation limit for terms aggregation
improve min_doc_count special use case 0, it loaded more texts from the
dict than segment_size limit

Note that we effectively check now that segment_size does not exceed the
bucket limit. segment_size is 10 * size. So requests would fail with
size 6500 for a bucket limit of 65000.

closes #1822
2023-01-24 00:01:01 +08:00

View File

@@ -11,7 +11,7 @@ use crate::aggregation::agg_req_with_accessor::{
use crate::aggregation::intermediate_agg_result::{
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
};
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector};
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
use crate::error::DataCorruption;
use crate::fastfield::MultiValuedFastFieldReader;
use crate::schema::Type;
@@ -268,21 +268,18 @@ impl TermBuckets {
term_ids: &[u64],
doc: DocId,
sub_aggregation: &AggregationsWithAccessor,
bucket_count: &BucketCount,
blueprint: &Option<SegmentAggregationResultsCollector>,
) -> crate::Result<()> {
for &term_id in term_ids {
let entry = self.entries.entry(term_id as u32).or_insert_with(|| {
bucket_count.add_count(1);
TermBucketEntry::from_blueprint(blueprint)
});
let entry = self
.entries
.entry(term_id as u32)
.or_insert_with(|| TermBucketEntry::from_blueprint(blueprint));
entry.doc_count += 1;
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.collect(doc, sub_aggregation)?;
}
}
bucket_count.validate_bucket_count()?;
Ok(())
}
@@ -372,7 +369,7 @@ impl SegmentTermCollector {
}
OrderTarget::SubAggregation(_name) => {
// don't sort and cut off since it's hard to make assumptions on the quality of the
// results when cutting off du to unknown nature of the sub_aggregation (possible
// results when cutting off due to unknown nature of the sub_aggregation (possible
// to check).
}
OrderTarget::Count => {
@@ -412,6 +409,10 @@ impl SegmentTermCollector {
if self.req.min_doc_count == 0 {
let mut stream = term_dict.stream()?;
while let Some((key, _ord)) = stream.next() {
if dict.len() >= self.req.segment_size as usize {
break;
}
let key = std::str::from_utf8(key)
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
if !dict.contains_key(key) {
@@ -433,6 +434,8 @@ impl SegmentTermCollector {
sum_other_doc_count += sum_other_docs;
dict = dict_entries.into_iter().collect();
}
agg_with_accessor.bucket_count.add_count(dict.len() as u32);
agg_with_accessor.bucket_count.validate_bucket_count()?;
Ok(IntermediateBucketResult::Terms(
IntermediateTermBucketResult {
@@ -469,28 +472,24 @@ impl SegmentTermCollector {
&vals1,
docs[0],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
self.term_buckets.increment_bucket(
&vals2,
docs[1],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
self.term_buckets.increment_bucket(
&vals3,
docs[2],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
self.term_buckets.increment_bucket(
&vals4,
docs[3],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
}
@@ -501,7 +500,6 @@ impl SegmentTermCollector {
&vals1,
doc,
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
}
@@ -1136,6 +1134,33 @@ mod tests {
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
min_doc_count: Some(0),
size: Some(1),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
// searching for terma, but min_doc_count will return all terms
let res = exec_request_with_query(agg_req, &index, Some(("string_id", "terma")))?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
assert_eq!(
res["my_texts"]["buckets"][1]["key"],
serde_json::Value::Null
);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
Ok(())
}
@@ -1214,6 +1239,27 @@ mod tests {
let index = get_test_index_from_terms(true, &terms_per_segment)?;
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
// min_doc_count: Some(0),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_ok());
// This request has min_doc_count set to 0
// That means we load potentially the whole dict
// Make sure the bucket count is still fine
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
@@ -1228,6 +1274,24 @@ mod tests {
.into_iter()
.collect();
let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_ok());
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
// min_doc_count: Some(0),
size: Some(70_000),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_err());
@@ -1384,14 +1448,10 @@ mod bench {
let mut collector = get_collector_with_buckets(total_terms);
let vals = get_rand_terms(total_terms, num_terms);
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
let bucket_count: BucketCount = BucketCount {
bucket_count: Default::default(),
max_bucket_count: 1_000_001u32,
};
b.iter(|| {
for &val in &vals {
collector
.increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None)
.increment_bucket(&[val], 0, &aggregations_with_accessor, &None)
.unwrap();
}
})