use radix map, fix prepare_max_bucket

use paged term map in term agg
use special no sub agg term map impl
This commit is contained in:
Pascal Seitz
2025-12-04 14:47:55 +08:00
committed by Pascal Seitz
parent c852bac532
commit 030554d544
2 changed files with 263 additions and 28 deletions

View File

@@ -375,7 +375,7 @@ pub(crate) fn build_segment_term_collector(
// TODO: A better metric instead of is_top_level would be the number of buckets expected.
// E.g. If term agg is not top level, but the parent is a bucket agg with less than 10 buckets,
// we can still use Vec.
let can_use_vec = terms_req_data.is_top_level;
let is_top_level = terms_req_data.is_top_level;
// TODO: Benchmark to validate the threshold
const MAX_NUM_TERMS_FOR_VEC: usize = 100;
@@ -394,14 +394,39 @@ pub(crate) fn build_segment_term_collector(
let mut bucket_id_provider = BucketIdProvider::default();
// - use a Vec instead of a hashmap for our aggregation.
if can_use_vec && max_term < MAX_NUM_TERMS_FOR_VEC {
let term_buckets = VecTermBuckets::new(max_term + 1, &mut bucket_id_provider);
if is_top_level && max_term < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations {
let term_buckets = VecTermBucketsNoAgg::new(max_term as u64 + 1, &mut bucket_id_provider);
let collector: SegmentTermCollector<_, true> = SegmentTermCollector {
buckets: vec![term_buckets],
accessor_idx,
sub_agg: None,
bucket_id_provider,
max_term_id: max_term as u64,
};
Ok(Box::new(collector))
} else if is_top_level && max_term < MAX_NUM_TERMS_FOR_VEC {
let term_buckets = VecTermBuckets::new(max_term as u64 + 1, &mut bucket_id_provider);
let sub_agg = sub_agg_collector.map(CachedSubAggs::<true>::new);
let collector: SegmentTermCollector<_, true> = SegmentTermCollector {
buckets: vec![term_buckets],
accessor_idx,
sub_agg,
bucket_id_provider,
max_term_id: max_term as u64,
};
Ok(Box::new(collector))
} else if max_term < 8_000_000 && is_top_level {
let term_buckets: PagedTermMap =
PagedTermMap::new(max_term as u64 + 1, &mut bucket_id_provider);
// Build sub-aggregation blueprint (flat pairs)
let sub_agg = sub_agg_collector.map(CachedSubAggs::<false>::new);
let collector: SegmentTermCollector<PagedTermMap, false> = SegmentTermCollector {
buckets: vec![term_buckets],
accessor_idx,
sub_agg,
bucket_id_provider,
max_term_id: max_term as u64,
};
Ok(Box::new(collector))
} else {
@@ -413,12 +438,13 @@ pub(crate) fn build_segment_term_collector(
accessor_idx,
sub_agg,
bucket_id_provider,
max_term_id: max_term as u64,
};
Ok(Box::new(collector))
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy, Default)]
struct Bucket {
pub count: u32,
pub bucket_id: BucketId,
@@ -435,7 +461,10 @@ impl Bucket {
}
/// Abstraction over the storage used for term buckets (counts only).
trait TermAggregationMap: Clone + Debug + Default + 'static {
trait TermAggregationMap: Clone + Debug + 'static {
/// Create a new instance with a strict upper bound on term ids.
fn new(max_term_id: u64, bucket_id_provider: &mut BucketIdProvider) -> Self;
/// Estimate the memory consumption of this struct in bytes.
fn get_memory_consumption(&self) -> usize;
@@ -461,6 +490,149 @@ impl Default for HashMapTermBuckets {
}
}
const PAGE_SHIFT: usize = 10;
const PAGE_SIZE: usize = 1 << PAGE_SHIFT; // 1024
const PAGE_MASK: usize = PAGE_SIZE - 1;
const BITMASK_LEN: usize = PAGE_SIZE / 64;
#[derive(Clone, Debug)]
struct Page {
presence: [u64; BITMASK_LEN],
data: [Bucket; PAGE_SIZE],
}
impl Page {
fn new() -> Self {
Self {
presence: [0; BITMASK_LEN],
data: [Bucket::default(); PAGE_SIZE],
}
}
#[inline]
fn is_set(&self, offset: usize) -> bool {
let word_idx = offset / 64;
let bit_idx = offset % 64;
(self.presence[word_idx] & (1 << bit_idx)) != 0
}
#[inline]
fn set_present(&mut self, offset: usize) {
let word_idx = offset / 64;
let bit_idx = offset % 64;
self.presence[word_idx] |= 1 << bit_idx;
}
// Flattened iteration logic
fn collect_items(&self, base_term_id: u64, result: &mut Vec<(u64, Bucket)>) {
for (word_idx, &word) in self.presence.iter().enumerate() {
if word == 0 {
continue;
}
let mut temp_word = word;
let base_offset = word_idx * 64;
while temp_word != 0 {
let bit = temp_word.trailing_zeros() as usize;
let offset = base_offset + bit;
result.push((base_term_id + offset as u64, self.data[offset]));
temp_word &= !(1 << bit);
}
}
}
}
/// A paged term map implementation for moderate sized term id sets.
/// Uses a fixed size vector of pages, each page containing a fixed size array of buckets.
///
/// Each page covers a range of term ids. Pages are allocated on demand.
/// This implementation is more memory efficient than a full Vec for sparse term id sets,
///
/// It has a fixed cost of `num_pages * 8 bytes` for the page directory.
/// For 1 million terms, this is 8 * 1024 = 8KB.
///
/// Note that for nested aggregations we create one TermAggregationMap per parent bucket.
/// For example, with 100 parent buckets and 1 million terms, this is 800KB overhead for the page
/// directories only. Therefore, this implementation is only enabled for top-level aggregations
/// TODO: pass expected number of buckets from parent instead of strict is_top_level flag.
#[derive(Clone, Debug, Default)]
struct PagedTermMap {
// Fixed size vector based on max_term_id
pages: Vec<Option<Box<Page>>>,
mem_usage: usize,
}
impl PagedTermMap {}
impl TermAggregationMap for PagedTermMap {
#[inline]
fn get_memory_consumption(&self) -> usize {
self.mem_usage + std::mem::size_of::<Self>()
}
#[inline]
fn term_entry(&mut self, term_id: u64, bucket_id_provider: &mut BucketIdProvider) -> BucketId {
let term_id = term_id as usize;
let page_idx = term_id >> PAGE_SHIFT;
let offset = term_id & PAGE_MASK;
// This panics if term_id > max_term_id
let page = match &mut self.pages[page_idx] {
Some(p) => p,
None => {
let new_page = Box::new(Page::new());
self.mem_usage += std::mem::size_of::<Page>();
self.pages[page_idx] = Some(new_page);
self.pages[page_idx].as_mut().unwrap()
}
};
if page.is_set(offset) {
let bucket = &mut page.data[offset];
bucket.count += 1;
bucket.bucket_id
} else {
let new_id = bucket_id_provider.next_bucket_id();
page.data[offset] = Bucket {
count: 1,
bucket_id: new_id,
};
page.set_present(offset);
new_id
}
}
fn into_vec(self) -> Vec<(u64, Bucket)> {
// Heuristic: Estimate active count.
let estimated_count = (self.mem_usage / std::mem::size_of::<Page>()) * (PAGE_SIZE / 2);
let mut result = Vec::with_capacity(estimated_count);
for (i, page_opt) in self.pages.into_iter().enumerate() {
if let Some(page) = page_opt {
let base_term_id = (i << PAGE_SHIFT) as u64;
page.collect_items(base_term_id, &mut result);
}
}
result
}
/// Initialize with a strict upper bound.
/// Panics if you try to insert a term_id > max_term_id.
fn new(max_term_id: u64, _bucket_id_provider: &mut BucketIdProvider) -> Self {
let max_page_idx = (max_term_id as usize) >> PAGE_SHIFT;
let num_pages = max_page_idx + 1;
// Pre-allocate the directory (pointers only, not the heavy pages)
// Memory cost: num_pages * 8 bytes
let pages = vec![None; num_pages];
let mem_usage = pages.capacity() * std::mem::size_of::<Option<Box<Page>>>();
Self { pages, mem_usage }
}
}
impl TermAggregationMap for HashMapTermBuckets {
#[inline]
fn get_memory_consumption(&self) -> usize {
@@ -480,24 +652,75 @@ impl TermAggregationMap for HashMapTermBuckets {
fn into_vec(self) -> Vec<(u64, Bucket)> {
self.bucket_map.into_iter().collect()
}
#[inline]
fn new(_max_term_id: u64, _bucket_id_provider: &mut BucketIdProvider) -> Self {
Self::default()
}
}
/// An optimized term map implementation for a compact set of term ordinals.
#[derive(Clone, Debug, Default)]
struct VecTermBuckets {
buckets: Vec<Bucket>,
#[derive(Clone, Debug)]
struct VecTermBucketsNoAgg {
buckets: Vec<u32>,
}
impl VecTermBuckets {
fn new(num_terms: usize, bucket_id_provider: &mut BucketIdProvider) -> Self {
VecTermBuckets {
buckets: std::iter::repeat_with(|| Bucket::new(bucket_id_provider.next_bucket_id()))
.take(num_terms)
impl TermAggregationMap for VecTermBucketsNoAgg {
/// Estimate the memory consumption of this struct in bytes.
fn get_memory_consumption(&self) -> usize {
// We do not include `std::mem::size_of::<Self>()`
// It is already measure by the parent aggregation.
//
self.buckets.capacity() * std::mem::size_of::<u32>()
}
/// Add an occurrence of the given term id.
#[inline(always)]
fn term_entry(&mut self, term_id: u64, _bucket_id_provider: &mut BucketIdProvider) -> BucketId {
let term_id_usize = term_id as usize;
debug_assert!(
term_id_usize < self.buckets.len(),
"term_id {} out of bounds for VecTermBuckets (len={})",
term_id,
self.buckets.len()
);
let count = unsafe { self.buckets.get_unchecked_mut(term_id_usize) };
*count += 1;
0 // unused
}
fn into_vec(self) -> Vec<(u64, Bucket)> {
self.buckets
.into_iter()
.enumerate()
.filter(|(_term_id, count)| *count > 0)
.map(|(term_id, count)| {
(
term_id as u64,
Bucket {
count,
bucket_id: 0, // unused, there are no sub-aggregations
},
)
})
.collect()
}
fn new(num_terms: u64, _bucket_id_provider: &mut BucketIdProvider) -> Self {
Self {
buckets: std::iter::repeat_with(|| 0)
.take(num_terms as usize)
.collect(),
}
}
}
/// An optimized term map implementation for a compact set of term ordinals.
#[derive(Clone, Debug)]
struct VecTermBuckets {
buckets: Vec<Bucket>,
}
impl TermAggregationMap for VecTermBuckets {
/// Estimate the memory consumption of this struct in bytes.
fn get_memory_consumption(&self) -> usize {
@@ -531,6 +754,14 @@ impl TermAggregationMap for VecTermBuckets {
.map(|(term_id, bucket)| (term_id as u64, bucket))
.collect()
}
fn new(num_terms: u64, bucket_id_provider: &mut BucketIdProvider) -> Self {
VecTermBuckets {
buckets: std::iter::repeat_with(|| Bucket::new(bucket_id_provider.next_bucket_id()))
.take(num_terms as usize)
.collect(),
}
}
}
/// The collector puts values from the fast field into the correct buckets and does a conversion to
@@ -542,6 +773,7 @@ struct SegmentTermCollector<TermMap: TermAggregationMap, const LOWCARD: bool = f
sub_agg: Option<CachedSubAggs<LOWCARD>>,
accessor_idx: usize,
bucket_id_provider: BucketIdProvider,
max_term_id: u64,
}
pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
@@ -560,7 +792,10 @@ impl<TermMap: TermAggregationMap, const LOWCARD: bool> SegmentAggregationCollect
) -> crate::Result<()> {
// TODO: avoid prepare_max_bucket here and handle empty buckets.
self.prepare_max_bucket(bucket, agg_data)?;
let bucket = std::mem::take(&mut self.buckets[bucket as usize]);
let bucket = std::mem::replace(
&mut self.buckets[bucket as usize],
TermMap::new(0, &mut self.bucket_id_provider),
);
let term_req = agg_data.get_term_req_data(self.accessor_idx);
let name = term_req.name.clone();
@@ -655,7 +890,8 @@ impl<TermMap: TermAggregationMap, const LOWCARD: bool> SegmentAggregationCollect
_agg_data: &AggregationsSegmentCtx,
) -> crate::Result<()> {
while self.buckets.len() <= max_bucket as usize {
let term_buckets: TermMap = TermMap::default();
let term_buckets: TermMap =
TermMap::new(self.max_term_id, &mut self.bucket_id_provider);
self.buckets.push(term_buckets);
}
Ok(())

View File

@@ -81,19 +81,6 @@ impl<const LOWCARD: bool> CachedSubAggs<LOWCARD> {
self.num_docs += 1;
}
#[inline]
pub fn extend_with_bucket_zero(&mut self, docs: &[DocId]) {
debug_assert!(
LOWCARD,
"extend_with_bucket_zero only valid for single bucket"
);
if self.per_bucket_docs.is_empty() {
self.per_bucket_docs.resize_with(1, Vec::new);
}
self.per_bucket_docs[0].extend_from_slice(docs);
self.num_docs += docs.len();
}
/// Check if we need to flush based on the number of documents cached.
/// If so, flushes the cache to the provided aggregation collector.
pub fn check_flush_local(
@@ -158,6 +145,18 @@ impl<const LOWCARD: bool> CachedSubAggs<LOWCARD> {
}
}
impl CachedSubAggs<true> {
/// Implemented Only for low cardinality cached sub-aggregations.
#[inline]
pub fn extend_with_bucket_zero(&mut self, docs: &[DocId]) {
if self.per_bucket_docs.is_empty() {
self.per_bucket_docs.resize_with(1, Vec::new);
}
self.per_bucket_docs[0].extend_from_slice(docs);
self.num_docs += docs.len();
}
}
#[derive(Debug, Clone)]
struct PartitionEntry {
bucket_ids: Vec<BucketId>,