diff --git a/benches/agg_bench.rs b/benches/agg_bench.rs index c3b0a6d26..642532597 100644 --- a/benches/agg_bench.rs +++ b/benches/agg_bench.rs @@ -478,6 +478,13 @@ fn get_collector(agg_req: Aggregations) -> AggregationCollector { } fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result { + // Flag to use existing index + let reuse_index = std::env::var("REUSE_AGG_BENCH_INDEX").is_ok(); + if reuse_index && std::path::Path::new("agg_bench").exists() { + return Index::open_in_dir("agg_bench"); + } + // crreate dir + std::fs::create_dir_all("agg_bench")?; let mut schema_builder = Schema::builder(); let text_fieldtype = tantivy::schema::TextOptions::default() .set_indexing_options( @@ -497,7 +504,12 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result { let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone()); let score_field_f64 = schema_builder.add_f64_field("score_f64", score_fieldtype.clone()); let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype); - let index = Index::create_from_tempdir(schema_builder.build())?; + // use tmp dir + let index = if reuse_index { + Index::create_in_dir("agg_bench", schema_builder.build())? + } else { + Index::create_from_tempdir(schema_builder.build())? + }; // Approximate log proportions let status_field_data = [ ("INFO", 8000), diff --git a/src/aggregation/bucket/filter.rs b/src/aggregation/bucket/filter.rs index 3d9e3ae82..b1fe7ad29 100644 --- a/src/aggregation/bucket/filter.rs +++ b/src/aggregation/bucket/filter.rs @@ -501,8 +501,8 @@ struct DocCount { /// Segment collector for filter aggregation pub struct SegmentFilterCollector { - /// Document counts per bucket - buckets: Vec, + /// Document counts per parent bucket + parent_buckets: Vec, /// Sub-aggregation collectors sub_aggregations: Option>, bucket_id_provider: BucketIdProvider, @@ -525,7 +525,7 @@ impl SegmentFilterCollector { let sub_agg_collector = sub_agg_collector.map(CachedSubAggs::new); Ok(SegmentFilterCollector { - buckets: Vec::new(), + parent_buckets: Vec::new(), sub_aggregations: sub_agg_collector, accessor_idx: node.idx_in_req_data, bucket_id_provider: BucketIdProvider::default(), @@ -536,7 +536,7 @@ impl SegmentFilterCollector { impl Debug for SegmentFilterCollector { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SegmentFilterCollector") - .field("buckets", &self.buckets) + .field("buckets", &self.parent_buckets) .field("has_sub_aggs", &self.sub_aggregations.is_some()) .field("accessor_idx", &self.accessor_idx) .finish() @@ -558,7 +558,7 @@ impl SegmentAggregationCollector for SegmentFilterCollector { parent_bucket_id: BucketId, ) -> crate::Result<()> { let mut sub_results = IntermediateAggregationResults::default(); - let bucket_opt = self.buckets.get(parent_bucket_id as usize); + let bucket_opt = self.parent_buckets.get(parent_bucket_id as usize); if let Some(sub_aggs) = &mut self.sub_aggregations { sub_aggs @@ -606,7 +606,7 @@ impl SegmentAggregationCollector for SegmentFilterCollector { return Ok(()); } - let mut bucket = self.buckets[parent_bucket_id as usize]; + let mut bucket = self.parent_buckets[parent_bucket_id as usize]; // Take the request data to avoid borrow checker issues with sub-aggregations let mut req = agg_data.take_filter_req_data(self.accessor_idx); @@ -632,7 +632,7 @@ impl SegmentAggregationCollector for SegmentFilterCollector { sub_aggs.check_flush_local(agg_data)?; } // put back bucket - self.buckets[parent_bucket_id as usize] = bucket; + self.parent_buckets[parent_bucket_id as usize] = bucket; Ok(()) } @@ -649,9 +649,9 @@ impl SegmentAggregationCollector for SegmentFilterCollector { max_bucket: BucketId, _agg_data: &AggregationsSegmentCtx, ) -> crate::Result<()> { - while self.buckets.len() <= max_bucket as usize { + while self.parent_buckets.len() <= max_bucket as usize { let bucket_id = self.bucket_id_provider.next_bucket_id(); - self.buckets.push(DocCount { + self.parent_buckets.push(DocCount { doc_count: 0, bucket_id, }); diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index daf1211eb..f506893e8 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -292,7 +292,7 @@ struct HistogramBuckets { pub struct SegmentHistogramCollector { /// The buckets containing the aggregation data. /// One Histogram bucket per parent bucket id. - buckets: Vec, + parent_buckets: Vec, sub_agg: Option, accessor_idx: usize, bucket_id_provider: BucketIdProvider, @@ -311,7 +311,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { .clone(); // TODO: avoid prepare_max_bucket here and handle empty buckets. self.prepare_max_bucket(parent_bucket_id, agg_data)?; - let histogram = std::mem::take(&mut self.buckets[parent_bucket_id as usize]); + let histogram = std::mem::take(&mut self.parent_buckets[parent_bucket_id as usize]); let bucket = self.add_intermediate_bucket_result(agg_data, histogram)?; results.push(name, IntermediateAggregationResult::Bucket(bucket))?; @@ -327,7 +327,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { ) -> crate::Result<()> { let mut req = agg_data.take_histogram_req_data(self.accessor_idx); let mem_pre = self.get_memory_consumption(); - let buckets = &mut self.buckets[parent_bucket_id as usize].buckets; + let buckets = &mut self.parent_buckets[parent_bucket_id as usize].buckets; let bounds = req.bounds; let interval = req.req.interval; @@ -385,8 +385,8 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { max_bucket: BucketId, _agg_data: &AggregationsSegmentCtx, ) -> crate::Result<()> { - while self.buckets.len() <= max_bucket as usize { - self.buckets.push(HistogramBuckets { + while self.parent_buckets.len() <= max_bucket as usize { + self.parent_buckets.push(HistogramBuckets { buckets: FxHashMap::default(), }); } @@ -397,7 +397,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { impl SegmentHistogramCollector { fn get_memory_consumption(&self) -> usize { let self_mem = std::mem::size_of::(); - let buckets_mem = self.buckets.len() * std::mem::size_of::(); + let buckets_mem = self.parent_buckets.len() * std::mem::size_of::(); self_mem + buckets_mem } /// Converts the collector result into a intermediate bucket result. @@ -406,7 +406,7 @@ impl SegmentHistogramCollector { agg_data: &AggregationsSegmentCtx, histogram: HistogramBuckets, ) -> crate::Result { - let mut buckets = Vec::with_capacity(self.buckets.len()); + let mut buckets = Vec::with_capacity(histogram.buckets.len()); for bucket in histogram.buckets.into_values() { let bucket_res = bucket.into_intermediate_bucket_entry(&mut self.sub_agg, agg_data); @@ -447,7 +447,7 @@ impl SegmentHistogramCollector { let sub_agg = sub_agg.map(CachedSubAggs::new); Ok(Self { - buckets: Default::default(), + parent_buckets: Default::default(), sub_agg, accessor_idx: node.idx_in_req_data, bucket_id_provider: BucketIdProvider::default(), diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 29f569d12..d536aace9 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -7,7 +7,7 @@ use columnar::{ Column, ColumnBlockAccessor, ColumnType, Dictionary, MonotonicallyMappableToU128, MonotonicallyMappableToU64, NumericalValue, StrColumn, }; -use common::BitSet; +use common::{BitSet, TinySet}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -398,7 +398,7 @@ pub(crate) fn build_segment_term_collector( if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations { let term_buckets = VecTermBucketsNoAgg::new(max_term_id + 1, &mut bucket_id_provider); let collector: SegmentTermCollector<_, true> = SegmentTermCollector { - buckets: vec![term_buckets], + parent_buckets: vec![term_buckets], accessor_idx, sub_agg: None, bucket_id_provider, @@ -409,7 +409,7 @@ pub(crate) fn build_segment_term_collector( let term_buckets = VecTermBuckets::new(max_term_id + 1, &mut bucket_id_provider); let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); let collector: SegmentTermCollector<_, true> = SegmentTermCollector { - buckets: vec![term_buckets], + parent_buckets: vec![term_buckets], accessor_idx, sub_agg, bucket_id_provider, @@ -422,7 +422,7 @@ pub(crate) fn build_segment_term_collector( // Build sub-aggregation blueprint (flat pairs) let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); let collector: SegmentTermCollector = SegmentTermCollector { - buckets: vec![term_buckets], + parent_buckets: vec![term_buckets], accessor_idx, sub_agg, bucket_id_provider, @@ -434,7 +434,7 @@ pub(crate) fn build_segment_term_collector( // Build sub-aggregation blueprint (flat pairs) let sub_agg = sub_agg_collector.map(CachedSubAggs::::new); let collector: SegmentTermCollector = SegmentTermCollector { - buckets: vec![term_buckets], + parent_buckets: vec![term_buckets], accessor_idx, sub_agg, bucket_id_provider, @@ -497,47 +497,42 @@ const BITMASK_LEN: usize = PAGE_SIZE / 64; #[derive(Clone, Debug)] struct Page { - presence: [u64; BITMASK_LEN], + /// Bitmask indicating which offsets are present. + /// It is chunked into TinySet words. + presence: [TinySet; BITMASK_LEN], data: [Bucket; PAGE_SIZE], } impl Page { fn new() -> Self { Self { - presence: [0; BITMASK_LEN], + presence: [TinySet::empty(); BITMASK_LEN], data: [Bucket::default(); PAGE_SIZE], } } #[inline] fn is_set(&self, offset: usize) -> bool { - let word_idx = offset / 64; + let bucket_idx = offset / 64; let bit_idx = offset % 64; - (self.presence[word_idx] & (1 << bit_idx)) != 0 + self.presence[bucket_idx].contains(bit_idx as u32) } #[inline] fn set_present(&mut self, offset: usize) { - let word_idx = offset / 64; + let bucket_idx = offset / 64; let bit_idx = offset % 64; - self.presence[word_idx] |= 1 << bit_idx; + self.presence[bucket_idx].insert_mut(bit_idx as u32); } // Flattened iteration logic fn collect_items(&self, base_term_id: u64, result: &mut Vec<(u64, Bucket)>) { - for (word_idx, &word) in self.presence.iter().enumerate() { - if word == 0 { - continue; - } + for (bucket_pos, &tiny_set) in self.presence.iter().enumerate() { + let base_offset = bucket_pos * 64; - let mut temp_word = word; - let base_offset = word_idx * 64; - - while temp_word != 0 { - let bit = temp_word.trailing_zeros() as usize; - let offset = base_offset + bit; + for bit in tiny_set.into_iter() { + let offset = base_offset + bit as usize; result.push((base_term_id + offset as u64, self.data[offset])); - temp_word &= !(1 << bit); } } } @@ -547,7 +542,7 @@ impl Page { /// Uses a fixed size vector of pages, each page containing a fixed size array of buckets. /// /// Each page covers a range of term ids. Pages are allocated on demand. -/// This implementation is more memory efficient than a full Vec for sparse term id sets, +/// This implementation is more memory efficient than a full Vec for high cardinality term id sets, /// /// It has a fixed cost of `num_pages * 8 bytes` for the page directory. /// For 1 million terms, this is 8 * 1024 = 8KB. @@ -604,8 +599,8 @@ impl TermAggregationMap for PagedTermMap { } fn into_vec(self) -> Vec<(u64, Bucket)> { - // Heuristic: Estimate active count. - let estimated_count = (self.mem_usage / std::mem::size_of::()) * (PAGE_SIZE / 2); + // estimate 16 entries per non-empty page + let estimated_count = self.pages.iter().filter(|p| p.is_some()).count() * 16; let mut result = Vec::with_capacity(estimated_count); for (i, page_opt) in self.pages.into_iter().enumerate() { @@ -769,7 +764,7 @@ impl TermAggregationMap for VecTermBuckets { #[derive(Clone, Debug)] struct SegmentTermCollector { /// The buckets containing the aggregation data. - buckets: Vec, + parent_buckets: Vec, sub_agg: Option>, accessor_idx: usize, bucket_id_provider: BucketIdProvider, @@ -793,7 +788,7 @@ impl SegmentAggregationCollect // TODO: avoid prepare_max_bucket here and handle empty buckets. self.prepare_max_bucket(bucket, agg_data)?; let bucket = std::mem::replace( - &mut self.buckets[bucket as usize], + &mut self.parent_buckets[bucket as usize], TermMap::new(0, &mut self.bucket_id_provider), ); let term_req = agg_data.get_term_req_data(self.accessor_idx); @@ -829,7 +824,7 @@ impl SegmentAggregationCollect } if let Some(sub_agg) = &mut self.sub_agg { - let term_buckets = &mut self.buckets[parent_bucket_id as usize]; + let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize]; let it = req_data .column_block_accessor .iter_docid_vals(docs, &req_data.accessor); @@ -850,7 +845,7 @@ impl SegmentAggregationCollect ); } } else { - let term_buckets = &mut self.buckets[parent_bucket_id as usize]; + let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize]; let it = req_data.column_block_accessor.iter_vals(); if let Some(allowed_bs) = req_data.allowed_term_ids.as_ref() { let it = it.filter(move |&term_id| allowed_bs.contains(term_id as u32)); @@ -888,10 +883,10 @@ impl SegmentAggregationCollect max_bucket: BucketId, _agg_data: &AggregationsSegmentCtx, ) -> crate::Result<()> { - while self.buckets.len() <= max_bucket as usize { + while self.parent_buckets.len() <= max_bucket as usize { let term_buckets: TermMap = TermMap::new(self.max_term_id, &mut self.bucket_id_provider); - self.buckets.push(term_buckets); + self.parent_buckets.push(term_buckets); } Ok(()) } @@ -925,7 +920,7 @@ impl SegmentTermCollector where TermMap: TermAggregationMap { fn get_memory_consumption(&self) -> usize { - self.buckets + self.parent_buckets .iter() .map(|b| b.get_memory_consumption()) .sum() @@ -1203,8 +1198,10 @@ mod tests { use common::DateTime; use time::{Date, Month}; + use super::{PagedTermMap, TermAggregationMap, PAGE_SIZE}; use crate::aggregation::agg_req::Aggregations; use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults; + use crate::aggregation::segment_agg_result::BucketIdProvider; use crate::aggregation::tests::{ exec_request, exec_request_with_query, exec_request_with_query_and_memory_limit, get_test_index_from_terms, get_test_index_from_values_and_terms, @@ -1215,6 +1212,43 @@ mod tests { use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING}; use crate::{Index, IndexWriter}; + #[test] + fn paged_term_map_reuses_buckets_and_counts() { + let mut bucket_id_provider = BucketIdProvider::default(); + let mut map = PagedTermMap::new((PAGE_SIZE * 2) as u64, &mut bucket_id_provider); + + let bucket_first = map.term_entry(5, &mut bucket_id_provider); + let bucket_second_page = map.term_entry((PAGE_SIZE + 7) as u64, &mut bucket_id_provider); + + // Reinsertions should increment counts and reuse bucket ids + assert_eq!(map.term_entry(5, &mut bucket_id_provider), bucket_first); + assert_eq!( + map.term_entry((PAGE_SIZE + 7) as u64, &mut bucket_id_provider), + bucket_second_page + ); + + // High offset exercises the TinySet presence word boundaries. + let bucket_high_bit = map.term_entry(63, &mut bucket_id_provider); + + let mut entries = map.into_vec(); + entries.sort_by_key(|(term_id, _)| *term_id); + + let expected = vec![ + (5u64, bucket_first, 2u32), + (63u64, bucket_high_bit, 1u32), + ((PAGE_SIZE + 7) as u64, bucket_second_page, 2u32), + ]; + + assert_eq!(entries.len(), expected.len()); + for ((term_id, bucket), (expected_term, expected_bucket_id, expected_count)) in + entries.into_iter().zip(expected) + { + assert_eq!(term_id, expected_term); + assert_eq!(bucket.bucket_id, expected_bucket_id); + assert_eq!(bucket.count, expected_count); + } + } + #[test] fn terms_aggregation_test_single_segment() -> crate::Result<()> { terms_aggregation_test_merge_segment(true) diff --git a/src/aggregation/bucket/term_missing_agg.rs b/src/aggregation/bucket/term_missing_agg.rs index c8ab51832..108fc20f8 100644 --- a/src/aggregation/bucket/term_missing_agg.rs +++ b/src/aggregation/bucket/term_missing_agg.rs @@ -48,7 +48,7 @@ struct MissingCount { pub struct TermMissingAgg { accessor_idx: usize, sub_agg: Option, - /// Idx = bucket id, Value = missing count for that bucket + /// Idx = parent bucket id, Value = missing count for that bucket missing_count_per_bucket: Vec, bucket_id_provider: BucketIdProvider, } diff --git a/src/aggregation/cached_sub_aggs.rs b/src/aggregation/cached_sub_aggs.rs index 759824e8e..0318bc8be 100644 --- a/src/aggregation/cached_sub_aggs.rs +++ b/src/aggregation/cached_sub_aggs.rs @@ -49,7 +49,7 @@ impl CachedSubAggs { per_bucket_docs: Vec::new(), num_docs: 0, sub_agg_collector: sub_agg, - partitions: core::array::from_fn(|_| PartitionEntry::new()), + partitions: core::array::from_fn(|_| PartitionEntry::default()), } } @@ -107,7 +107,7 @@ impl CachedSubAggs { .prepare_max_bucket(max_bucket, agg_data)?; // The threshold above which we flush buckets individually. // Note: We need to make sure that we don't lock ourselves into a situation where we hit - // the FLUSH_THRESHOLD, but never flush any buckets. + // the FLUSH_THRESHOLD, but never flush any buckets. (except the final flush) let mut bucket_treshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2); if force { bucket_treshold = 0; @@ -157,21 +157,13 @@ impl CachedSubAggs { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] struct PartitionEntry { bucket_ids: Vec, docs: Vec, } impl PartitionEntry { - #[inline] - fn new() -> Self { - Self { - bucket_ids: Vec::with_capacity(FLUSH_THRESHOLD / NUM_PARTITIONS), - docs: Vec::with_capacity(FLUSH_THRESHOLD / NUM_PARTITIONS), - } - } - #[inline] fn clear(&mut self) { self.bucket_ids.clear();