From 87fe3a311f0c137fded61913f738de57f291de2d Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 12 Dec 2025 16:54:44 +0800 Subject: [PATCH] share column block accessor --- src/aggregation/agg_data.rs | 27 ++++++----- src/aggregation/bucket/histogram/histogram.rs | 12 ++--- src/aggregation/bucket/range.rs | 12 ++--- src/aggregation/bucket/term_agg.rs | 14 +++--- src/aggregation/metric/cardinality.rs | 47 +++++++++++-------- src/aggregation/metric/extended_stats.rs | 13 ++--- src/aggregation/metric/mod.rs | 4 +- src/aggregation/metric/percentiles.rs | 44 ++++++++++------- src/aggregation/metric/stats.rs | 15 +++--- 9 files changed, 104 insertions(+), 84 deletions(-) diff --git a/src/aggregation/agg_data.rs b/src/aggregation/agg_data.rs index 946c5a5a0..fd97e365b 100644 --- a/src/aggregation/agg_data.rs +++ b/src/aggregation/agg_data.rs @@ -1,4 +1,4 @@ -use columnar::{Column, ColumnType, StrColumn}; +use columnar::{Column, ColumnBlockAccessor, ColumnType, StrColumn}; use common::BitSet; use rustc_hash::FxHashSet; use serde::Serialize; @@ -35,6 +35,7 @@ pub struct AggregationsSegmentCtx { /// Request data for each aggregation type. pub per_request: PerRequestAggSegCtx, pub context: AggContextParams, + pub column_block_accessor: ColumnBlockAccessor, } impl AggregationsSegmentCtx { @@ -371,6 +372,8 @@ pub(crate) fn build_segment_agg_collector( Ok(Box::new(SegmentCardinalityCollector::from_req( req_data.column_type, node.idx_in_req_data, + req_data.accessor.clone(), + req_data.missing_value_for_accessor, ))) } AggKind::StatsKind(stats_type) => { @@ -385,9 +388,17 @@ pub(crate) fn build_segment_agg_collector( StatsType::ExtendedStats(sigma) => Ok(Box::new( SegmentExtendedStatsCollector::from_req(req_data, sigma), )), - StatsType::Percentiles => Ok(Box::new( - SegmentPercentilesCollector::from_req_and_validate(node.idx_in_req_data)?, - )), + StatsType::Percentiles => { + let req_data = req.get_metric_req_data_mut(node.idx_in_req_data); + Ok(Box::new( + SegmentPercentilesCollector::from_req_and_validate( + req_data.field_type, + req_data.missing_u64, + req_data.accessor.clone(), + node.idx_in_req_data, + ), + )) + } } } AggKind::TopHits => { @@ -467,6 +478,7 @@ pub(crate) fn build_aggregations_data_from_req( let mut data = AggregationsSegmentCtx { per_request: Default::default(), context, + column_block_accessor: ColumnBlockAccessor::default(), }; for (name, agg) in aggs.iter() { @@ -495,7 +507,6 @@ fn build_nodes( let idx_in_req_data = data.push_range_req_data(RangeAggReqData { accessor, field_type, - column_block_accessor: Default::default(), name: agg_name.to_string(), req: range_req.clone(), is_top_level, @@ -516,7 +527,6 @@ fn build_nodes( let idx_in_req_data = data.push_histogram_req_data(HistogramAggReqData { accessor, field_type, - column_block_accessor: Default::default(), name: agg_name.to_string(), req: histo_req.clone(), is_date_histogram: false, @@ -542,7 +552,6 @@ fn build_nodes( let idx_in_req_data = data.push_histogram_req_data(HistogramAggReqData { accessor, field_type, - column_block_accessor: Default::default(), name: agg_name.to_string(), req: histo_req, is_date_histogram: true, @@ -623,7 +632,6 @@ fn build_nodes( let idx_in_req_data = data.push_metric_req_data(MetricAggReqData { accessor, field_type, - column_block_accessor: Default::default(), name: agg_name.to_string(), collecting_for, missing: *missing, @@ -651,7 +659,6 @@ fn build_nodes( let idx_in_req_data = data.push_metric_req_data(MetricAggReqData { accessor, field_type, - column_block_accessor: Default::default(), name: agg_name.to_string(), collecting_for: StatsType::Percentiles, missing: percentiles_req.missing, @@ -899,7 +906,6 @@ fn build_terms_or_cardinality_nodes( column_type, str_dict_column: str_dict_column.clone(), missing_value_for_accessor, - column_block_accessor: Default::default(), name: agg_name.to_string(), req: TermsAggregationInternal::from_req(req), sug_aggregations: sub_aggs.clone(), @@ -914,7 +920,6 @@ fn build_terms_or_cardinality_nodes( column_type, str_dict_column: str_dict_column.clone(), missing_value_for_accessor, - column_block_accessor: Default::default(), name: agg_name.to_string(), req: req.clone(), }); diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 11ae8a8fc..6e8198582 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -1,6 +1,6 @@ use std::cmp::Ordering; -use columnar::{Column, ColumnBlockAccessor, ColumnType}; +use columnar::{Column, ColumnType}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use tantivy_bitpacker::minmax; @@ -26,8 +26,6 @@ pub struct HistogramAggReqData { pub accessor: Column, /// The field type of the fast field. pub field_type: ColumnType, - /// The column block accessor to access the fast field values. - pub column_block_accessor: ColumnBlockAccessor, /// The name of the aggregation. pub name: String, /// The histogram aggregation request. @@ -325,7 +323,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { docs: &[crate::DocId], agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { - let mut req = agg_data.take_histogram_req_data(self.accessor_idx); + let req = agg_data.take_histogram_req_data(self.accessor_idx); let mem_pre = self.get_memory_consumption(); let buckets = &mut self.parent_buckets[parent_bucket_id as usize].buckets; @@ -334,8 +332,10 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { let offset = req.offset; let get_bucket_pos = |val| get_bucket_pos_f64(val, interval, offset) as i64; - req.column_block_accessor.fetch_block(docs, &req.accessor); - for (doc, val) in req + agg_data + .column_block_accessor + .fetch_block(docs, &req.accessor); + for (doc, val) in agg_data .column_block_accessor .iter_docid_vals(docs, &req.accessor) { diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index de9cfd395..682c007a1 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use std::ops::Range; -use columnar::{Column, ColumnBlockAccessor, ColumnType}; +use columnar::{Column, ColumnType}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -25,8 +25,6 @@ pub struct RangeAggReqData { pub accessor: Column, /// The type of the fast field. pub field_type: ColumnType, - /// The column block accessor to access the fast field values. - pub column_block_accessor: ColumnBlockAccessor, /// The range aggregation request. pub req: RangeAggregation, /// The name of the aggregation. @@ -280,13 +278,15 @@ impl SegmentAggregationCollector for SegmentRangeCollector< docs: &[crate::DocId], agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { - let mut req = agg_data.take_range_req_data(self.accessor_idx); + let req = agg_data.take_range_req_data(self.accessor_idx); - req.column_block_accessor.fetch_block(docs, &req.accessor); + agg_data + .column_block_accessor + .fetch_block(docs, &req.accessor); let buckets = &mut self.parent_buckets[parent_bucket_id as usize]; - for (doc, val) in req + for (doc, val) in agg_data .column_block_accessor .iter_docid_vals(docs, &req.accessor) { diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 99a156902..4d569ddad 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -4,8 +4,8 @@ use std::net::Ipv6Addr; use columnar::column_values::CompactSpaceU64Accessor; use columnar::{ - Column, ColumnBlockAccessor, ColumnType, Dictionary, MonotonicallyMappableToU128, - MonotonicallyMappableToU64, NumericalValue, StrColumn, + Column, ColumnType, Dictionary, MonotonicallyMappableToU128, MonotonicallyMappableToU64, + NumericalValue, StrColumn, }; use common::{BitSet, TinySet}; use rustc_hash::FxHashMap; @@ -39,8 +39,6 @@ pub struct TermsAggReqData { pub str_dict_column: Option, /// The missing value as u64 value. pub missing_value_for_accessor: Option, - /// The column block accessor to access the fast field values. - pub column_block_accessor: ColumnBlockAccessor, /// Used to build the correct nested result when we have an empty result. pub sug_aggregations: Aggregations, /// The name of the aggregation. @@ -806,20 +804,20 @@ impl SegmentAggregationCollect let req_data = &mut self.terms_req_data; if let Some(missing) = req_data.missing_value_for_accessor { - req_data.column_block_accessor.fetch_block_with_missing( + agg_data.column_block_accessor.fetch_block_with_missing( docs, &req_data.accessor, missing, ); } else { - req_data + agg_data .column_block_accessor .fetch_block(docs, &req_data.accessor); } if let Some(sub_agg) = &mut self.sub_agg { let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize]; - let it = req_data + let it = agg_data .column_block_accessor .iter_docid_vals(docs, &req_data.accessor); if let Some(allowed_bs) = req_data.allowed_term_ids.as_ref() { @@ -840,7 +838,7 @@ impl SegmentAggregationCollect } } else { let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize]; - let it = req_data.column_block_accessor.iter_vals(); + let it = agg_data.column_block_accessor.iter_vals(); if let Some(allowed_bs) = req_data.allowed_term_ids.as_ref() { let it = it.filter(move |&term_id| allowed_bs.contains(term_id as u32)); Self::collect_terms(it, term_buckets, &mut self.bucket_id_provider); diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index 77387fc5e..9336e04cb 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -2,7 +2,7 @@ use std::collections::hash_map::DefaultHasher; use std::hash::{BuildHasher, Hasher}; use columnar::column_values::CompactSpaceU64Accessor; -use columnar::{Column, ColumnBlockAccessor, ColumnType, Dictionary, StrColumn}; +use columnar::{Column, ColumnType, Dictionary, StrColumn}; use common::f64_to_u64; use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; use rustc_hash::FxHashSet; @@ -106,8 +106,6 @@ pub struct CardinalityAggReqData { pub str_dict_column: Option, /// The missing value normalized to the internal u64 representation of the field type. pub missing_value_for_accessor: Option, - /// The column block accessor to access the fast field values. - pub(crate) column_block_accessor: ColumnBlockAccessor, /// The name of the aggregation. pub name: String, /// The aggregation request. @@ -135,11 +133,16 @@ impl CardinalityAggregationReq { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub(crate) struct SegmentCardinalityCollector { buckets: Vec, - column_type: ColumnType, accessor_idx: usize, + /// The column accessor to access the fast field values. + accessor: Column, + /// The column_type of the field. + column_type: ColumnType, + /// The missing value normalized to the internal u64 representation of the field type. + missing_value_for_accessor: Option, } #[derive(Clone, Debug, PartialEq, Default)] @@ -213,29 +216,34 @@ impl SegmentCardinalityCollectorBucket { } impl SegmentCardinalityCollector { - pub fn from_req(column_type: ColumnType, accessor_idx: usize) -> Self { + pub fn from_req( + column_type: ColumnType, + accessor_idx: usize, + accessor: Column, + missing_value_for_accessor: Option, + ) -> Self { Self { buckets: vec![SegmentCardinalityCollectorBucket::new(column_type); 1], column_type, accessor_idx, + accessor, + missing_value_for_accessor, } } fn fetch_block_with_field( &mut self, docs: &[crate::DocId], - agg_data: &mut CardinalityAggReqData, + agg_data: &mut AggregationsSegmentCtx, ) { - if let Some(missing) = agg_data.missing_value_for_accessor { - agg_data.column_block_accessor.fetch_block_with_missing( - docs, - &agg_data.accessor, - missing, - ); + if let Some(missing) = self.missing_value_for_accessor { + agg_data + .column_block_accessor + .fetch_block_with_missing(docs, &self.accessor, missing); } else { agg_data .column_block_accessor - .fetch_block(docs, &agg_data.accessor); + .fetch_block(docs, &self.accessor); } } } @@ -268,17 +276,16 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector { docs: &[crate::DocId], agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { - let req_data = agg_data.get_cardinality_req_data_mut(self.accessor_idx); - self.fetch_block_with_field(docs, req_data); + self.fetch_block_with_field(docs, agg_data); let bucket = &mut self.buckets[parent_bucket_id as usize]; - let col_block_accessor = &req_data.column_block_accessor; - if req_data.column_type == ColumnType::Str { + let col_block_accessor = &agg_data.column_block_accessor; + if self.column_type == ColumnType::Str { for term_ord in col_block_accessor.iter_vals() { bucket.entries.insert(term_ord); } - } else if req_data.column_type == ColumnType::IpAddr { - let compact_space_accessor = req_data + } else if self.column_type == ColumnType::IpAddr { + let compact_space_accessor = self .accessor .values .clone() diff --git a/src/aggregation/metric/extended_stats.rs b/src/aggregation/metric/extended_stats.rs index 4280ab272..af159e787 100644 --- a/src/aggregation/metric/extended_stats.rs +++ b/src/aggregation/metric/extended_stats.rs @@ -323,7 +323,6 @@ pub(crate) struct SegmentExtendedStatsCollector { missing: Option, field_type: ColumnType, accessor: columnar::Column, - column_block_accessor: columnar::ColumnBlockAccessor, buckets: Vec, sigma: Option, } @@ -337,7 +336,6 @@ impl SegmentExtendedStatsCollector { name: req.name.clone(), field_type: req.field_type, accessor: req.accessor.clone(), - column_block_accessor: req.column_block_accessor.clone(), missing, buckets: vec![IntermediateExtendedStats::with_sigma(sigma); 16], sigma, @@ -371,17 +369,20 @@ impl SegmentAggregationCollector for SegmentExtendedStatsCollector { &mut self, parent_bucket_id: BucketId, docs: &[crate::DocId], - _agg_data: &mut AggregationsSegmentCtx, + agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { let mut extended_stats = self.buckets[parent_bucket_id as usize].clone(); if let Some(missing) = self.missing.as_ref() { - self.column_block_accessor + agg_data + .column_block_accessor .fetch_block_with_missing(docs, &self.accessor, *missing); } else { - self.column_block_accessor.fetch_block(docs, &self.accessor); + agg_data + .column_block_accessor + .fetch_block(docs, &self.accessor); } - for val in self.column_block_accessor.iter_vals() { + for val in agg_data.column_block_accessor.iter_vals() { let val1 = f64_from_fastfield_u64(val, self.field_type); extended_stats.collect(val1); } diff --git a/src/aggregation/metric/mod.rs b/src/aggregation/metric/mod.rs index 3537af8a6..d3a448a38 100644 --- a/src/aggregation/metric/mod.rs +++ b/src/aggregation/metric/mod.rs @@ -31,7 +31,7 @@ use std::collections::HashMap; pub use average::*; pub use cardinality::*; -use columnar::{Column, ColumnBlockAccessor, ColumnType}; +use columnar::{Column, ColumnType}; pub use count::*; pub use extended_stats::*; pub use max::*; @@ -55,8 +55,6 @@ pub struct MetricAggReqData { pub field_type: ColumnType, /// The missing value normalized to the internal u64 representation of the field type. pub missing_u64: Option, - /// The column block accessor to access the fast field values. - pub column_block_accessor: ColumnBlockAccessor, /// The column accessor to access the fast field values. pub accessor: Column, /// Used when converting to intermediate result diff --git a/src/aggregation/metric/percentiles.rs b/src/aggregation/metric/percentiles.rs index 9d027b7c0..3f9a9ca1f 100644 --- a/src/aggregation/metric/percentiles.rs +++ b/src/aggregation/metric/percentiles.rs @@ -130,10 +130,16 @@ impl PercentilesAggregationReq { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub(crate) struct SegmentPercentilesCollector { pub(crate) buckets: Vec, pub(crate) accessor_idx: usize, + /// The type of the field. + pub field_type: ColumnType, + /// The missing value normalized to the internal u64 representation of the field type. + pub missing_u64: Option, + /// The column accessor to access the fast field values. + pub accessor: Column, } #[derive(Clone, Serialize, Deserialize)] @@ -228,11 +234,19 @@ impl PercentilesCollector { } impl SegmentPercentilesCollector { - pub fn from_req_and_validate(accessor_idx: usize) -> crate::Result { - Ok(Self { + pub fn from_req_and_validate( + field_type: ColumnType, + missing_u64: Option, + accessor: Column, + accessor_idx: usize, + ) -> Self { + Self { buckets: Vec::with_capacity(64), + field_type, + missing_u64, + accessor, accessor_idx, - }) + } } } @@ -268,22 +282,18 @@ impl SegmentAggregationCollector for SegmentPercentilesCollector { agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { let percentiles = &mut self.buckets[parent_bucket_id as usize]; - let req_data = agg_data.get_metric_req_data_mut(self.accessor_idx); - - if let Some(missing) = req_data.missing_u64.as_ref() { - req_data.column_block_accessor.fetch_block_with_missing( - docs, - &req_data.accessor, - *missing, - ); - } else { - req_data + if let Some(missing) = self.missing_u64.as_ref() { + agg_data .column_block_accessor - .fetch_block(docs, &req_data.accessor); + .fetch_block_with_missing(docs, &self.accessor, *missing); + } else { + agg_data + .column_block_accessor + .fetch_block(docs, &self.accessor); } - for val in req_data.column_block_accessor.iter_vals() { - let val1 = f64_from_fastfield_u64(val, req_data.field_type); + for val in agg_data.column_block_accessor.iter_vals() { + let val1 = f64_from_fastfield_u64(val, self.field_type); percentiles.collect(val1); } diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index 6f3385a8f..5ec1b0184 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use columnar::{Column, ColumnBlockAccessor, ColumnType}; +use columnar::{Column, ColumnType}; use serde::{Deserialize, Serialize}; use super::*; @@ -195,7 +195,6 @@ fn create_collector( collecting_for: req.collecting_for, is_number_or_date_type: req.is_number_or_date_type, missing_u64: req.missing_u64, - column_block_accessor: req.column_block_accessor.clone(), accessor: req.accessor.clone(), buckets: vec![IntermediateStats::default()], }) @@ -222,7 +221,6 @@ pub(crate) fn build_segment_stats_collector( pub(crate) struct SegmentStatsCollector { pub(crate) missing_u64: Option, pub(crate) accessor: Column, - pub(crate) column_block_accessor: ColumnBlockAccessor, pub(crate) is_number_or_date_type: bool, pub(crate) buckets: Vec, pub(crate) name: String, @@ -275,7 +273,7 @@ impl SegmentAggregationCollector &mut self, parent_bucket_id: BucketId, docs: &[crate::DocId], - _agg_data: &mut AggregationsSegmentCtx, + agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { // TODO: remove once we fetch all values for all bucket ids in one go if docs.len() == 1 && self.missing_u64.is_none() { @@ -288,14 +286,17 @@ impl SegmentAggregationCollector return Ok(()); } if let Some(missing) = self.missing_u64.as_ref() { - self.column_block_accessor + agg_data + .column_block_accessor .fetch_block_with_missing(docs, &self.accessor, *missing); } else { - self.column_block_accessor.fetch_block(docs, &self.accessor); + agg_data + .column_block_accessor + .fetch_block(docs, &self.accessor); } collect_stats::( &mut self.buckets[parent_bucket_id as usize], - self.column_block_accessor.iter_vals(), + agg_data.column_block_accessor.iter_vals(), self.is_number_or_date_type, )?;