This commit is contained in:
Pascal Seitz
2025-12-08 09:16:50 +08:00
committed by Pascal Seitz
parent 78bd3826dc
commit 411587a2d9
6 changed files with 100 additions and 62 deletions

View File

@@ -478,6 +478,13 @@ fn get_collector(agg_req: Aggregations) -> AggregationCollector {
}
fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
// Flag to use existing index
let reuse_index = std::env::var("REUSE_AGG_BENCH_INDEX").is_ok();
if reuse_index && std::path::Path::new("agg_bench").exists() {
return Index::open_in_dir("agg_bench");
}
// crreate dir
std::fs::create_dir_all("agg_bench")?;
let mut schema_builder = Schema::builder();
let text_fieldtype = tantivy::schema::TextOptions::default()
.set_indexing_options(
@@ -497,7 +504,12 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone());
let score_field_f64 = schema_builder.add_f64_field("score_f64", score_fieldtype.clone());
let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype);
let index = Index::create_from_tempdir(schema_builder.build())?;
// use tmp dir
let index = if reuse_index {
Index::create_in_dir("agg_bench", schema_builder.build())?
} else {
Index::create_from_tempdir(schema_builder.build())?
};
// Approximate log proportions
let status_field_data = [
("INFO", 8000),

View File

@@ -501,8 +501,8 @@ struct DocCount {
/// Segment collector for filter aggregation
pub struct SegmentFilterCollector {
/// Document counts per bucket
buckets: Vec<DocCount>,
/// Document counts per parent bucket
parent_buckets: Vec<DocCount>,
/// Sub-aggregation collectors
sub_aggregations: Option<CachedSubAggs<true>>,
bucket_id_provider: BucketIdProvider,
@@ -525,7 +525,7 @@ impl SegmentFilterCollector {
let sub_agg_collector = sub_agg_collector.map(CachedSubAggs::new);
Ok(SegmentFilterCollector {
buckets: Vec::new(),
parent_buckets: Vec::new(),
sub_aggregations: sub_agg_collector,
accessor_idx: node.idx_in_req_data,
bucket_id_provider: BucketIdProvider::default(),
@@ -536,7 +536,7 @@ impl SegmentFilterCollector {
impl Debug for SegmentFilterCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SegmentFilterCollector")
.field("buckets", &self.buckets)
.field("buckets", &self.parent_buckets)
.field("has_sub_aggs", &self.sub_aggregations.is_some())
.field("accessor_idx", &self.accessor_idx)
.finish()
@@ -558,7 +558,7 @@ impl SegmentAggregationCollector for SegmentFilterCollector {
parent_bucket_id: BucketId,
) -> crate::Result<()> {
let mut sub_results = IntermediateAggregationResults::default();
let bucket_opt = self.buckets.get(parent_bucket_id as usize);
let bucket_opt = self.parent_buckets.get(parent_bucket_id as usize);
if let Some(sub_aggs) = &mut self.sub_aggregations {
sub_aggs
@@ -606,7 +606,7 @@ impl SegmentAggregationCollector for SegmentFilterCollector {
return Ok(());
}
let mut bucket = self.buckets[parent_bucket_id as usize];
let mut bucket = self.parent_buckets[parent_bucket_id as usize];
// Take the request data to avoid borrow checker issues with sub-aggregations
let mut req = agg_data.take_filter_req_data(self.accessor_idx);
@@ -632,7 +632,7 @@ impl SegmentAggregationCollector for SegmentFilterCollector {
sub_aggs.check_flush_local(agg_data)?;
}
// put back bucket
self.buckets[parent_bucket_id as usize] = bucket;
self.parent_buckets[parent_bucket_id as usize] = bucket;
Ok(())
}
@@ -649,9 +649,9 @@ impl SegmentAggregationCollector for SegmentFilterCollector {
max_bucket: BucketId,
_agg_data: &AggregationsSegmentCtx,
) -> crate::Result<()> {
while self.buckets.len() <= max_bucket as usize {
while self.parent_buckets.len() <= max_bucket as usize {
let bucket_id = self.bucket_id_provider.next_bucket_id();
self.buckets.push(DocCount {
self.parent_buckets.push(DocCount {
doc_count: 0,
bucket_id,
});

View File

@@ -292,7 +292,7 @@ struct HistogramBuckets {
pub struct SegmentHistogramCollector {
/// The buckets containing the aggregation data.
/// One Histogram bucket per parent bucket id.
buckets: Vec<HistogramBuckets>,
parent_buckets: Vec<HistogramBuckets>,
sub_agg: Option<CachedSubAggs>,
accessor_idx: usize,
bucket_id_provider: BucketIdProvider,
@@ -311,7 +311,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
.clone();
// TODO: avoid prepare_max_bucket here and handle empty buckets.
self.prepare_max_bucket(parent_bucket_id, agg_data)?;
let histogram = std::mem::take(&mut self.buckets[parent_bucket_id as usize]);
let histogram = std::mem::take(&mut self.parent_buckets[parent_bucket_id as usize]);
let bucket = self.add_intermediate_bucket_result(agg_data, histogram)?;
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
@@ -327,7 +327,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
) -> crate::Result<()> {
let mut req = agg_data.take_histogram_req_data(self.accessor_idx);
let mem_pre = self.get_memory_consumption();
let buckets = &mut self.buckets[parent_bucket_id as usize].buckets;
let buckets = &mut self.parent_buckets[parent_bucket_id as usize].buckets;
let bounds = req.bounds;
let interval = req.req.interval;
@@ -385,8 +385,8 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
max_bucket: BucketId,
_agg_data: &AggregationsSegmentCtx,
) -> crate::Result<()> {
while self.buckets.len() <= max_bucket as usize {
self.buckets.push(HistogramBuckets {
while self.parent_buckets.len() <= max_bucket as usize {
self.parent_buckets.push(HistogramBuckets {
buckets: FxHashMap::default(),
});
}
@@ -397,7 +397,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
impl SegmentHistogramCollector {
fn get_memory_consumption(&self) -> usize {
let self_mem = std::mem::size_of::<Self>();
let buckets_mem = self.buckets.len() * std::mem::size_of::<HistogramBuckets>();
let buckets_mem = self.parent_buckets.len() * std::mem::size_of::<HistogramBuckets>();
self_mem + buckets_mem
}
/// Converts the collector result into a intermediate bucket result.
@@ -406,7 +406,7 @@ impl SegmentHistogramCollector {
agg_data: &AggregationsSegmentCtx,
histogram: HistogramBuckets,
) -> crate::Result<IntermediateBucketResult> {
let mut buckets = Vec::with_capacity(self.buckets.len());
let mut buckets = Vec::with_capacity(histogram.buckets.len());
for bucket in histogram.buckets.into_values() {
let bucket_res = bucket.into_intermediate_bucket_entry(&mut self.sub_agg, agg_data);
@@ -447,7 +447,7 @@ impl SegmentHistogramCollector {
let sub_agg = sub_agg.map(CachedSubAggs::new);
Ok(Self {
buckets: Default::default(),
parent_buckets: Default::default(),
sub_agg,
accessor_idx: node.idx_in_req_data,
bucket_id_provider: BucketIdProvider::default(),

View File

@@ -7,7 +7,7 @@ use columnar::{
Column, ColumnBlockAccessor, ColumnType, Dictionary, MonotonicallyMappableToU128,
MonotonicallyMappableToU64, NumericalValue, StrColumn,
};
use common::BitSet;
use common::{BitSet, TinySet};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
@@ -398,7 +398,7 @@ pub(crate) fn build_segment_term_collector(
if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations {
let term_buckets = VecTermBucketsNoAgg::new(max_term_id + 1, &mut bucket_id_provider);
let collector: SegmentTermCollector<_, true> = SegmentTermCollector {
buckets: vec![term_buckets],
parent_buckets: vec![term_buckets],
accessor_idx,
sub_agg: None,
bucket_id_provider,
@@ -409,7 +409,7 @@ pub(crate) fn build_segment_term_collector(
let term_buckets = VecTermBuckets::new(max_term_id + 1, &mut bucket_id_provider);
let sub_agg = sub_agg_collector.map(CachedSubAggs::<true>::new);
let collector: SegmentTermCollector<_, true> = SegmentTermCollector {
buckets: vec![term_buckets],
parent_buckets: vec![term_buckets],
accessor_idx,
sub_agg,
bucket_id_provider,
@@ -422,7 +422,7 @@ pub(crate) fn build_segment_term_collector(
// Build sub-aggregation blueprint (flat pairs)
let sub_agg = sub_agg_collector.map(CachedSubAggs::<false>::new);
let collector: SegmentTermCollector<PagedTermMap, false> = SegmentTermCollector {
buckets: vec![term_buckets],
parent_buckets: vec![term_buckets],
accessor_idx,
sub_agg,
bucket_id_provider,
@@ -434,7 +434,7 @@ pub(crate) fn build_segment_term_collector(
// Build sub-aggregation blueprint (flat pairs)
let sub_agg = sub_agg_collector.map(CachedSubAggs::<false>::new);
let collector: SegmentTermCollector<HashMapTermBuckets, false> = SegmentTermCollector {
buckets: vec![term_buckets],
parent_buckets: vec![term_buckets],
accessor_idx,
sub_agg,
bucket_id_provider,
@@ -497,47 +497,42 @@ const BITMASK_LEN: usize = PAGE_SIZE / 64;
#[derive(Clone, Debug)]
struct Page {
presence: [u64; BITMASK_LEN],
/// Bitmask indicating which offsets are present.
/// It is chunked into TinySet words.
presence: [TinySet; BITMASK_LEN],
data: [Bucket; PAGE_SIZE],
}
impl Page {
fn new() -> Self {
Self {
presence: [0; BITMASK_LEN],
presence: [TinySet::empty(); BITMASK_LEN],
data: [Bucket::default(); PAGE_SIZE],
}
}
#[inline]
fn is_set(&self, offset: usize) -> bool {
let word_idx = offset / 64;
let bucket_idx = offset / 64;
let bit_idx = offset % 64;
(self.presence[word_idx] & (1 << bit_idx)) != 0
self.presence[bucket_idx].contains(bit_idx as u32)
}
#[inline]
fn set_present(&mut self, offset: usize) {
let word_idx = offset / 64;
let bucket_idx = offset / 64;
let bit_idx = offset % 64;
self.presence[word_idx] |= 1 << bit_idx;
self.presence[bucket_idx].insert_mut(bit_idx as u32);
}
// Flattened iteration logic
fn collect_items(&self, base_term_id: u64, result: &mut Vec<(u64, Bucket)>) {
for (word_idx, &word) in self.presence.iter().enumerate() {
if word == 0 {
continue;
}
for (bucket_pos, &tiny_set) in self.presence.iter().enumerate() {
let base_offset = bucket_pos * 64;
let mut temp_word = word;
let base_offset = word_idx * 64;
while temp_word != 0 {
let bit = temp_word.trailing_zeros() as usize;
let offset = base_offset + bit;
for bit in tiny_set.into_iter() {
let offset = base_offset + bit as usize;
result.push((base_term_id + offset as u64, self.data[offset]));
temp_word &= !(1 << bit);
}
}
}
@@ -547,7 +542,7 @@ impl Page {
/// Uses a fixed size vector of pages, each page containing a fixed size array of buckets.
///
/// Each page covers a range of term ids. Pages are allocated on demand.
/// This implementation is more memory efficient than a full Vec for sparse term id sets,
/// This implementation is more memory efficient than a full Vec for high cardinality term id sets,
///
/// It has a fixed cost of `num_pages * 8 bytes` for the page directory.
/// For 1 million terms, this is 8 * 1024 = 8KB.
@@ -604,8 +599,8 @@ impl TermAggregationMap for PagedTermMap {
}
fn into_vec(self) -> Vec<(u64, Bucket)> {
// Heuristic: Estimate active count.
let estimated_count = (self.mem_usage / std::mem::size_of::<Page>()) * (PAGE_SIZE / 2);
// estimate 16 entries per non-empty page
let estimated_count = self.pages.iter().filter(|p| p.is_some()).count() * 16;
let mut result = Vec::with_capacity(estimated_count);
for (i, page_opt) in self.pages.into_iter().enumerate() {
@@ -769,7 +764,7 @@ impl TermAggregationMap for VecTermBuckets {
#[derive(Clone, Debug)]
struct SegmentTermCollector<TermMap: TermAggregationMap, const LOWCARD: bool = false> {
/// The buckets containing the aggregation data.
buckets: Vec<TermMap>,
parent_buckets: Vec<TermMap>,
sub_agg: Option<CachedSubAggs<LOWCARD>>,
accessor_idx: usize,
bucket_id_provider: BucketIdProvider,
@@ -793,7 +788,7 @@ impl<TermMap: TermAggregationMap, const LOWCARD: bool> SegmentAggregationCollect
// TODO: avoid prepare_max_bucket here and handle empty buckets.
self.prepare_max_bucket(bucket, agg_data)?;
let bucket = std::mem::replace(
&mut self.buckets[bucket as usize],
&mut self.parent_buckets[bucket as usize],
TermMap::new(0, &mut self.bucket_id_provider),
);
let term_req = agg_data.get_term_req_data(self.accessor_idx);
@@ -829,7 +824,7 @@ impl<TermMap: TermAggregationMap, const LOWCARD: bool> SegmentAggregationCollect
}
if let Some(sub_agg) = &mut self.sub_agg {
let term_buckets = &mut self.buckets[parent_bucket_id as usize];
let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize];
let it = req_data
.column_block_accessor
.iter_docid_vals(docs, &req_data.accessor);
@@ -850,7 +845,7 @@ impl<TermMap: TermAggregationMap, const LOWCARD: bool> SegmentAggregationCollect
);
}
} else {
let term_buckets = &mut self.buckets[parent_bucket_id as usize];
let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize];
let it = req_data.column_block_accessor.iter_vals();
if let Some(allowed_bs) = req_data.allowed_term_ids.as_ref() {
let it = it.filter(move |&term_id| allowed_bs.contains(term_id as u32));
@@ -888,10 +883,10 @@ impl<TermMap: TermAggregationMap, const LOWCARD: bool> SegmentAggregationCollect
max_bucket: BucketId,
_agg_data: &AggregationsSegmentCtx,
) -> crate::Result<()> {
while self.buckets.len() <= max_bucket as usize {
while self.parent_buckets.len() <= max_bucket as usize {
let term_buckets: TermMap =
TermMap::new(self.max_term_id, &mut self.bucket_id_provider);
self.buckets.push(term_buckets);
self.parent_buckets.push(term_buckets);
}
Ok(())
}
@@ -925,7 +920,7 @@ impl<TermMap, const LOWCARD: bool> SegmentTermCollector<TermMap, LOWCARD>
where TermMap: TermAggregationMap
{
fn get_memory_consumption(&self) -> usize {
self.buckets
self.parent_buckets
.iter()
.map(|b| b.get_memory_consumption())
.sum()
@@ -1203,8 +1198,10 @@ mod tests {
use common::DateTime;
use time::{Date, Month};
use super::{PagedTermMap, TermAggregationMap, PAGE_SIZE};
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults;
use crate::aggregation::segment_agg_result::BucketIdProvider;
use crate::aggregation::tests::{
exec_request, exec_request_with_query, exec_request_with_query_and_memory_limit,
get_test_index_from_terms, get_test_index_from_values_and_terms,
@@ -1215,6 +1212,43 @@ mod tests {
use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
use crate::{Index, IndexWriter};
#[test]
fn paged_term_map_reuses_buckets_and_counts() {
let mut bucket_id_provider = BucketIdProvider::default();
let mut map = PagedTermMap::new((PAGE_SIZE * 2) as u64, &mut bucket_id_provider);
let bucket_first = map.term_entry(5, &mut bucket_id_provider);
let bucket_second_page = map.term_entry((PAGE_SIZE + 7) as u64, &mut bucket_id_provider);
// Reinsertions should increment counts and reuse bucket ids
assert_eq!(map.term_entry(5, &mut bucket_id_provider), bucket_first);
assert_eq!(
map.term_entry((PAGE_SIZE + 7) as u64, &mut bucket_id_provider),
bucket_second_page
);
// High offset exercises the TinySet presence word boundaries.
let bucket_high_bit = map.term_entry(63, &mut bucket_id_provider);
let mut entries = map.into_vec();
entries.sort_by_key(|(term_id, _)| *term_id);
let expected = vec![
(5u64, bucket_first, 2u32),
(63u64, bucket_high_bit, 1u32),
((PAGE_SIZE + 7) as u64, bucket_second_page, 2u32),
];
assert_eq!(entries.len(), expected.len());
for ((term_id, bucket), (expected_term, expected_bucket_id, expected_count)) in
entries.into_iter().zip(expected)
{
assert_eq!(term_id, expected_term);
assert_eq!(bucket.bucket_id, expected_bucket_id);
assert_eq!(bucket.count, expected_count);
}
}
#[test]
fn terms_aggregation_test_single_segment() -> crate::Result<()> {
terms_aggregation_test_merge_segment(true)

View File

@@ -48,7 +48,7 @@ struct MissingCount {
pub struct TermMissingAgg {
accessor_idx: usize,
sub_agg: Option<CachedSubAggs>,
/// Idx = bucket id, Value = missing count for that bucket
/// Idx = parent bucket id, Value = missing count for that bucket
missing_count_per_bucket: Vec<MissingCount>,
bucket_id_provider: BucketIdProvider,
}

View File

@@ -49,7 +49,7 @@ impl<const LOWCARD: bool> CachedSubAggs<LOWCARD> {
per_bucket_docs: Vec::new(),
num_docs: 0,
sub_agg_collector: sub_agg,
partitions: core::array::from_fn(|_| PartitionEntry::new()),
partitions: core::array::from_fn(|_| PartitionEntry::default()),
}
}
@@ -107,7 +107,7 @@ impl<const LOWCARD: bool> CachedSubAggs<LOWCARD> {
.prepare_max_bucket(max_bucket, agg_data)?;
// The threshold above which we flush buckets individually.
// Note: We need to make sure that we don't lock ourselves into a situation where we hit
// the FLUSH_THRESHOLD, but never flush any buckets.
// the FLUSH_THRESHOLD, but never flush any buckets. (except the final flush)
let mut bucket_treshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2);
if force {
bucket_treshold = 0;
@@ -157,21 +157,13 @@ impl<const LOWCARD: bool> CachedSubAggs<LOWCARD> {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
struct PartitionEntry {
bucket_ids: Vec<BucketId>,
docs: Vec<DocId>,
}
impl PartitionEntry {
#[inline]
fn new() -> Self {
Self {
bucket_ids: Vec::with_capacity(FLUSH_THRESHOLD / NUM_PARTITIONS),
docs: Vec::with_capacity(FLUSH_THRESHOLD / NUM_PARTITIONS),
}
}
#[inline]
fn clear(&mut self) {
self.bucket_ids.clear();