move partitions to heap

This commit is contained in:
Pascal Seitz
2026-01-05 15:07:00 +01:00
parent 8d3a7abbe9
commit b269fd8bb8

View File

@@ -34,7 +34,6 @@ pub type LowCardCachedSubAggs = CachedSubAggs<LowCardSubAggCache>;
pub type HighCardCachedSubAggs = CachedSubAggs<HighCardSubAggCache>;
const FLUSH_THRESHOLD: usize = 2048;
const NUM_PARTITIONS: usize = 16;
/// A trait for caching sub-aggregation doc ids per bucket id.
/// Different implementations can be used depending on the cardinality
@@ -95,6 +94,9 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
}
}
/// Number of partitions for high cardinality sub-aggregation cache.
const NUM_PARTITIONS: usize = 16;
#[derive(Debug)]
pub(crate) struct HighCardSubAggCache {
/// This weird partitioning is used to do some cheap grouping on the bucket ids.
@@ -103,13 +105,13 @@ pub(crate) struct HighCardSubAggCache {
///
/// We want to keep this cheap, because high cardinality aggregations can have a lot of
/// buckets, and there may be nothing to group.
partitions: [PartitionEntry; NUM_PARTITIONS],
partitions: Box<[PartitionEntry; NUM_PARTITIONS]>,
}
impl HighCardSubAggCache {
#[inline]
fn clear(&mut self) {
for partition in &mut self.partitions {
for partition in self.partitions.iter_mut() {
partition.clear();
}
}
@@ -132,7 +134,7 @@ impl PartitionEntry {
impl SubAggCache for HighCardSubAggCache {
fn new() -> Self {
Self {
partitions: core::array::from_fn(|_| PartitionEntry::default()),
partitions: Box::new(core::array::from_fn(|_| PartitionEntry::default())),
}
}
@@ -150,7 +152,7 @@ impl SubAggCache for HighCardSubAggCache {
_force: bool,
) -> crate::Result<()> {
let mut max_bucket = 0u32;
for partition in &self.partitions {
for partition in self.partitions.iter() {
if let Some(&local_max) = partition.bucket_ids.iter().max() {
max_bucket = max_bucket.max(local_max);
}
@@ -158,7 +160,7 @@ impl SubAggCache for HighCardSubAggCache {
sub_agg.prepare_max_bucket(max_bucket, agg_data)?;
for slot in &self.partitions {
for slot in self.partitions.iter() {
if !slot.bucket_ids.is_empty() {
// Reduce dynamic dispatch overhead by collecting a full partition in one call.
sub_agg.collect_multiple(&slot.bucket_ids, &slot.docs, agg_data)?;