diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index eca2d411f..ed849deea 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -362,13 +362,19 @@ impl SegmentTermCollector { let mut entries: Vec<(u32, TermBucketEntry)> = self.term_buckets.entries.into_iter().collect(); - let order_by_key = self.req.order.target == OrderTarget::Key; let order_by_sub_aggregation = matches!(self.req.order.target, OrderTarget::SubAggregation(_)); match self.req.order.target { OrderTarget::Key => { - // defer order and cut_off after loading the texts from the dictionary + // We rely on the fact, that term ordinals match the order of the strings + // TODO: We could have a special collector, that keeps only TOP n results at any + // time. + if self.req.order.order == Order::Desc { + entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0)); + } else { + entries.sort_unstable_by_key(|bucket| bucket.0); + } } OrderTarget::SubAggregation(_name) => { // don't sort and cut off since it's hard to make assumptions on the quality of the @@ -384,12 +390,11 @@ impl SegmentTermCollector { } } - let (term_doc_count_before_cutoff, mut sum_other_doc_count) = - if order_by_key || order_by_sub_aggregation { - (0, 0) - } else { - cut_off_buckets(&mut entries, self.req.segment_size as usize) - }; + let (term_doc_count_before_cutoff, sum_other_doc_count) = if order_by_sub_aggregation { + (0, 0) + } else { + cut_off_buckets(&mut entries, self.req.segment_size as usize) + }; let inverted_index = agg_with_accessor .inverted_index @@ -412,6 +417,10 @@ impl SegmentTermCollector { if self.req.min_doc_count == 0 { let mut stream = term_dict.stream()?; while let Some((key, _ord)) = stream.next() { + if dict.len() >= self.req.segment_size as usize { + break; + } + let key = std::str::from_utf8(key) .map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?; if !dict.contains_key(key) { @@ -420,20 +429,6 @@ impl SegmentTermCollector { } } - if order_by_key { - let mut dict_entries = dict.into_iter().collect_vec(); - if self.req.order.order == Order::Desc { - dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key1.cmp(key2)); - } else { - dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key2.cmp(key1)); - } - let (_, sum_other_docs) = - cut_off_buckets(&mut dict_entries, self.req.segment_size as usize); - - sum_other_doc_count += sum_other_docs; - dict = dict_entries.into_iter().collect(); - } - Ok(IntermediateBucketResult::Terms( IntermediateTermBucketResult { entries: dict, @@ -923,14 +918,14 @@ mod tests { ]; let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?; - // key desc + // key asc let agg_req: Aggregations = vec![( "my_texts".to_string(), Aggregation::Bucket(BucketAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation { field: "string_id".to_string(), order: Some(CustomOrder { - order: Order::Desc, + order: Order::Asc, target: OrderTarget::Key, }), ..Default::default() @@ -957,7 +952,7 @@ mod tests { bucket_agg: BucketAggregationType::Terms(TermsAggregation { field: "string_id".to_string(), order: Some(CustomOrder { - order: Order::Desc, + order: Order::Asc, target: OrderTarget::Key, }), size: Some(2), @@ -981,14 +976,14 @@ mod tests { assert_eq!(res["my_texts"]["sum_other_doc_count"], 3); - // key desc and segment_size cut_off + // key asc and segment_size cut_off let agg_req: Aggregations = vec![( "my_texts".to_string(), Aggregation::Bucket(BucketAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation { field: "string_id".to_string(), order: Some(CustomOrder { - order: Order::Desc, + order: Order::Asc, target: OrderTarget::Key, }), size: Some(2), @@ -1011,14 +1006,14 @@ mod tests { serde_json::Value::Null ); - // key asc + // key desc let agg_req: Aggregations = vec![( "my_texts".to_string(), Aggregation::Bucket(BucketAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation { field: "string_id".to_string(), order: Some(CustomOrder { - order: Order::Asc, + order: Order::Desc, target: OrderTarget::Key, }), ..Default::default() @@ -1038,14 +1033,14 @@ mod tests { assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5); assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); - // key asc, size cut_off + // key desc, size cut_off let agg_req: Aggregations = vec![( "my_texts".to_string(), Aggregation::Bucket(BucketAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation { field: "string_id".to_string(), order: Some(CustomOrder { - order: Order::Asc, + order: Order::Desc, target: OrderTarget::Key, }), size: Some(2), @@ -1068,14 +1063,14 @@ mod tests { ); assert_eq!(res["my_texts"]["sum_other_doc_count"], 5); - // key asc, segment_size cut_off + // key desc, segment_size cut_off let agg_req: Aggregations = vec![( "my_texts".to_string(), Aggregation::Bucket(BucketAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation { field: "string_id".to_string(), order: Some(CustomOrder { - order: Order::Asc, + order: Order::Desc, target: OrderTarget::Key, }), size: Some(2), @@ -1352,68 +1347,3 @@ mod tests { Ok(()) } } - -#[cfg(all(test, feature = "unstable"))] -mod bench { - - use itertools::Itertools; - use rand::seq::SliceRandom; - use rand::thread_rng; - - use super::*; - - fn get_collector_with_buckets(num_docs: u64) -> TermBuckets { - TermBuckets::from_req_and_validate(&Default::default(), num_docs as usize).unwrap() - } - - fn get_rand_terms(total_terms: u64, num_terms_returned: u64) -> Vec { - let mut rng = thread_rng(); - - let all_terms = (0..total_terms - 1).collect_vec(); - - let mut vals = vec![]; - for _ in 0..num_terms_returned { - let val = all_terms.as_slice().choose(&mut rng).unwrap(); - vals.push(*val); - } - - vals - } - - fn bench_term_buckets(b: &mut test::Bencher, num_terms: u64, total_terms: u64) { - let mut collector = get_collector_with_buckets(total_terms); - let vals = get_rand_terms(total_terms, num_terms); - let aggregations_with_accessor: AggregationsWithAccessor = Default::default(); - let bucket_count: BucketCount = BucketCount { - bucket_count: Default::default(), - max_bucket_count: 1_000_001u32, - }; - b.iter(|| { - for &val in &vals { - collector - .increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None) - .unwrap(); - } - }) - } - - #[bench] - fn bench_term_buckets_500_of_1_000_000(b: &mut test::Bencher) { - bench_term_buckets(b, 500u64, 1_000_000u64) - } - - #[bench] - fn bench_term_buckets_1_000_000_of_50_000(b: &mut test::Bencher) { - bench_term_buckets(b, 1_000_000u64, 50_000u64) - } - - #[bench] - fn bench_term_buckets_1_000_000_of_50(b: &mut test::Bencher) { - bench_term_buckets(b, 1_000_000u64, 50u64) - } - - #[bench] - fn bench_term_buckets_1_000_000_of_1_000_000(b: &mut test::Bencher) { - bench_term_buckets(b, 1_000_000u64, 1_000_000u64) - } -} diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 390cee13f..94249bc55 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -499,7 +499,7 @@ impl IntermediateTermBucketResult { match req.order.target { OrderTarget::Key => { buckets.sort_by(|left, right| { - if req.order.order == Order::Desc { + if req.order.order == Order::Asc { left.key.partial_cmp(&right.key) } else { right.key.partial_cmp(&left.key) diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index c952a3bda..d28c3cd05 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -1156,12 +1156,6 @@ mod tests { r#"FieldNotFound("not_exist_field")"# ); - let agg_res = avg_on_field("scores_i64"); - assert_eq!( - format!("{:?}", agg_res), - r#"InvalidArgument("Invalid field cardinality on field scores_i64 expected SingleValue, but got MultiValues")"# - ); - Ok(()) }