From cbf2bdc75be46ee11aecb3cd9fa1ae3d1818f175 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 27 Apr 2023 15:47:31 +0800 Subject: [PATCH] change bucket count type (#2013) * change bucket count type closes #2012 * Update src/aggregation/agg_limits.rs Co-authored-by: Paul Masurel * Update src/directory/managed_directory.rs Co-authored-by: Paul Masurel * fix test --------- Co-authored-by: Paul Masurel --- src/aggregation/agg_limits.rs | 4 ++-- src/aggregation/bucket/histogram/histogram.rs | 2 +- src/aggregation/bucket/term_agg.rs | 10 +++++----- src/aggregation/intermediate_agg_result.rs | 6 +++--- src/aggregation/mod.rs | 2 -- 5 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/aggregation/agg_limits.rs b/src/aggregation/agg_limits.rs index 35c415221..0cd1df8bd 100644 --- a/src/aggregation/agg_limits.rs +++ b/src/aggregation/agg_limits.rs @@ -15,8 +15,8 @@ pub trait MemoryConsumption { impl MemoryConsumption for HashMap { fn memory_consumption(&self) -> usize { - let num_items = self.capacity(); - (std::mem::size_of::() + std::mem::size_of::()) * num_items + let capacity = self.capacity(); + (std::mem::size_of::() + std::mem::size_of::() + 1) * capacity } } diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 7d6112f59..db7f45dbf 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -696,7 +696,7 @@ mod tests { assert_eq!( res.to_string(), "Aborting aggregation because memory limit was exceeded. Limit: 5.00 KB, Current: \ - 59.71 KB" + 59.82 KB" ); Ok(()) diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index d4005fff8..19b0c948c 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -202,7 +202,7 @@ impl TermsAggregationInternal { #[derive(Clone, Debug, Default)] /// Container to store term_ids/or u64 values and their buckets. struct TermBuckets { - pub(crate) entries: FxHashMap, + pub(crate) entries: FxHashMap, pub(crate) sub_aggs: FxHashMap>, } @@ -362,7 +362,7 @@ impl SegmentTermCollector { mut self, agg_with_accessor: &AggregationWithAccessor, ) -> crate::Result { - let mut entries: Vec<(u64, u64)> = self.term_buckets.entries.into_iter().collect(); + let mut entries: Vec<(u64, u32)> = self.term_buckets.entries.into_iter().collect(); let order_by_sub_aggregation = matches!(self.req.order.target, OrderTarget::SubAggregation(_)); @@ -488,14 +488,14 @@ impl SegmentTermCollector { pub(crate) trait GetDocCount { fn doc_count(&self) -> u64; } -impl GetDocCount for (u64, u64) { +impl GetDocCount for (u64, u32) { fn doc_count(&self) -> u64 { - self.1 + self.1 as u64 } } impl GetDocCount for (String, IntermediateTermBucketEntry) { fn doc_count(&self) -> u64 { - self.1.doc_count + self.1.doc_count as u64 } } diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index c0833eb03..2ff1dd394 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -508,12 +508,12 @@ impl IntermediateTermBucketResult { let mut buckets: Vec = self .entries .into_iter() - .filter(|bucket| bucket.1.doc_count >= req.min_doc_count) + .filter(|bucket| bucket.1.doc_count as u64 >= req.min_doc_count) .map(|(key, entry)| { Ok(BucketEntry { key_as_string: None, key: key.into(), - doc_count: entry.doc_count, + doc_count: entry.doc_count as u64, sub_aggregation: entry .sub_aggregation .into_final_result_internal(sub_aggregation_req, limits)?, @@ -703,7 +703,7 @@ impl IntermediateRangeBucketEntry { #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateTermBucketEntry { /// The number of documents in the bucket. - pub doc_count: u64, + pub doc_count: u32, /// The sub_aggregation in this bucket. pub sub_aggregation: IntermediateAggregationResults, } diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 4b9905aef..d878b76fe 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -155,8 +155,6 @@ pub use error::AggregationError; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use self::intermediate_agg_result::IntermediateKey; - /// Represents an associative array `(key => values)` in a very efficient manner. #[derive(Clone, PartialEq, Serialize, Deserialize)] pub(crate) struct VecWithNames {