change bucket count type (#2013)

* change bucket count type

closes #2012

* Update src/aggregation/agg_limits.rs

Co-authored-by: Paul Masurel <paul@quickwit.io>

* Update src/directory/managed_directory.rs

Co-authored-by: Paul Masurel <paul@quickwit.io>

* fix test

---------

Co-authored-by: Paul Masurel <paul@quickwit.io>
This commit is contained in:
PSeitz
2023-04-27 15:47:31 +08:00
committed by GitHub
parent 1f06997d04
commit cbf2bdc75b
5 changed files with 11 additions and 13 deletions

View File

@@ -15,8 +15,8 @@ pub trait MemoryConsumption {
impl<K, V, S> MemoryConsumption for HashMap<K, V, S> {
fn memory_consumption(&self) -> usize {
let num_items = self.capacity();
(std::mem::size_of::<K>() + std::mem::size_of::<V>()) * num_items
let capacity = self.capacity();
(std::mem::size_of::<K>() + std::mem::size_of::<V>() + 1) * capacity
}
}

View File

@@ -696,7 +696,7 @@ mod tests {
assert_eq!(
res.to_string(),
"Aborting aggregation because memory limit was exceeded. Limit: 5.00 KB, Current: \
59.71 KB"
59.82 KB"
);
Ok(())

View File

@@ -202,7 +202,7 @@ impl TermsAggregationInternal {
#[derive(Clone, Debug, Default)]
/// Container to store term_ids/or u64 values and their buckets.
struct TermBuckets {
pub(crate) entries: FxHashMap<u64, u64>,
pub(crate) entries: FxHashMap<u64, u32>,
pub(crate) sub_aggs: FxHashMap<u64, Box<dyn SegmentAggregationCollector>>,
}
@@ -362,7 +362,7 @@ impl SegmentTermCollector {
mut self,
agg_with_accessor: &AggregationWithAccessor,
) -> crate::Result<IntermediateBucketResult> {
let mut entries: Vec<(u64, u64)> = self.term_buckets.entries.into_iter().collect();
let mut entries: Vec<(u64, u32)> = self.term_buckets.entries.into_iter().collect();
let order_by_sub_aggregation =
matches!(self.req.order.target, OrderTarget::SubAggregation(_));
@@ -488,14 +488,14 @@ impl SegmentTermCollector {
pub(crate) trait GetDocCount {
fn doc_count(&self) -> u64;
}
impl GetDocCount for (u64, u64) {
impl GetDocCount for (u64, u32) {
fn doc_count(&self) -> u64 {
self.1
self.1 as u64
}
}
impl GetDocCount for (String, IntermediateTermBucketEntry) {
fn doc_count(&self) -> u64 {
self.1.doc_count
self.1.doc_count as u64
}
}

View File

@@ -508,12 +508,12 @@ impl IntermediateTermBucketResult {
let mut buckets: Vec<BucketEntry> = self
.entries
.into_iter()
.filter(|bucket| bucket.1.doc_count >= req.min_doc_count)
.filter(|bucket| bucket.1.doc_count as u64 >= req.min_doc_count)
.map(|(key, entry)| {
Ok(BucketEntry {
key_as_string: None,
key: key.into(),
doc_count: entry.doc_count,
doc_count: entry.doc_count as u64,
sub_aggregation: entry
.sub_aggregation
.into_final_result_internal(sub_aggregation_req, limits)?,
@@ -703,7 +703,7 @@ impl IntermediateRangeBucketEntry {
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
pub struct IntermediateTermBucketEntry {
/// The number of documents in the bucket.
pub doc_count: u64,
pub doc_count: u32,
/// The sub_aggregation in this bucket.
pub sub_aggregation: IntermediateAggregationResults,
}

View File

@@ -155,8 +155,6 @@ pub use error::AggregationError;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use self::intermediate_agg_result::IntermediateKey;
/// Represents an associative array `(key => values)` in a very efficient manner.
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub(crate) struct VecWithNames<T: Clone> {