share column block accessor

This commit is contained in:
Pascal Seitz
2025-12-12 16:54:44 +08:00
parent 71dc08424c
commit 87fe3a311f
9 changed files with 104 additions and 84 deletions

View File

@@ -1,4 +1,4 @@
use columnar::{Column, ColumnType, StrColumn};
use columnar::{Column, ColumnBlockAccessor, ColumnType, StrColumn};
use common::BitSet;
use rustc_hash::FxHashSet;
use serde::Serialize;
@@ -35,6 +35,7 @@ pub struct AggregationsSegmentCtx {
/// Request data for each aggregation type.
pub per_request: PerRequestAggSegCtx,
pub context: AggContextParams,
pub column_block_accessor: ColumnBlockAccessor<u64>,
}
impl AggregationsSegmentCtx {
@@ -371,6 +372,8 @@ pub(crate) fn build_segment_agg_collector(
Ok(Box::new(SegmentCardinalityCollector::from_req(
req_data.column_type,
node.idx_in_req_data,
req_data.accessor.clone(),
req_data.missing_value_for_accessor,
)))
}
AggKind::StatsKind(stats_type) => {
@@ -385,9 +388,17 @@ pub(crate) fn build_segment_agg_collector(
StatsType::ExtendedStats(sigma) => Ok(Box::new(
SegmentExtendedStatsCollector::from_req(req_data, sigma),
)),
StatsType::Percentiles => Ok(Box::new(
SegmentPercentilesCollector::from_req_and_validate(node.idx_in_req_data)?,
)),
StatsType::Percentiles => {
let req_data = req.get_metric_req_data_mut(node.idx_in_req_data);
Ok(Box::new(
SegmentPercentilesCollector::from_req_and_validate(
req_data.field_type,
req_data.missing_u64,
req_data.accessor.clone(),
node.idx_in_req_data,
),
))
}
}
}
AggKind::TopHits => {
@@ -467,6 +478,7 @@ pub(crate) fn build_aggregations_data_from_req(
let mut data = AggregationsSegmentCtx {
per_request: Default::default(),
context,
column_block_accessor: ColumnBlockAccessor::default(),
};
for (name, agg) in aggs.iter() {
@@ -495,7 +507,6 @@ fn build_nodes(
let idx_in_req_data = data.push_range_req_data(RangeAggReqData {
accessor,
field_type,
column_block_accessor: Default::default(),
name: agg_name.to_string(),
req: range_req.clone(),
is_top_level,
@@ -516,7 +527,6 @@ fn build_nodes(
let idx_in_req_data = data.push_histogram_req_data(HistogramAggReqData {
accessor,
field_type,
column_block_accessor: Default::default(),
name: agg_name.to_string(),
req: histo_req.clone(),
is_date_histogram: false,
@@ -542,7 +552,6 @@ fn build_nodes(
let idx_in_req_data = data.push_histogram_req_data(HistogramAggReqData {
accessor,
field_type,
column_block_accessor: Default::default(),
name: agg_name.to_string(),
req: histo_req,
is_date_histogram: true,
@@ -623,7 +632,6 @@ fn build_nodes(
let idx_in_req_data = data.push_metric_req_data(MetricAggReqData {
accessor,
field_type,
column_block_accessor: Default::default(),
name: agg_name.to_string(),
collecting_for,
missing: *missing,
@@ -651,7 +659,6 @@ fn build_nodes(
let idx_in_req_data = data.push_metric_req_data(MetricAggReqData {
accessor,
field_type,
column_block_accessor: Default::default(),
name: agg_name.to_string(),
collecting_for: StatsType::Percentiles,
missing: percentiles_req.missing,
@@ -899,7 +906,6 @@ fn build_terms_or_cardinality_nodes(
column_type,
str_dict_column: str_dict_column.clone(),
missing_value_for_accessor,
column_block_accessor: Default::default(),
name: agg_name.to_string(),
req: TermsAggregationInternal::from_req(req),
sug_aggregations: sub_aggs.clone(),
@@ -914,7 +920,6 @@ fn build_terms_or_cardinality_nodes(
column_type,
str_dict_column: str_dict_column.clone(),
missing_value_for_accessor,
column_block_accessor: Default::default(),
name: agg_name.to_string(),
req: req.clone(),
});

View File

@@ -1,6 +1,6 @@
use std::cmp::Ordering;
use columnar::{Column, ColumnBlockAccessor, ColumnType};
use columnar::{Column, ColumnType};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use tantivy_bitpacker::minmax;
@@ -26,8 +26,6 @@ pub struct HistogramAggReqData {
pub accessor: Column<u64>,
/// The field type of the fast field.
pub field_type: ColumnType,
/// The column block accessor to access the fast field values.
pub column_block_accessor: ColumnBlockAccessor<u64>,
/// The name of the aggregation.
pub name: String,
/// The histogram aggregation request.
@@ -325,7 +323,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
docs: &[crate::DocId],
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let mut req = agg_data.take_histogram_req_data(self.accessor_idx);
let req = agg_data.take_histogram_req_data(self.accessor_idx);
let mem_pre = self.get_memory_consumption();
let buckets = &mut self.parent_buckets[parent_bucket_id as usize].buckets;
@@ -334,8 +332,10 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
let offset = req.offset;
let get_bucket_pos = |val| get_bucket_pos_f64(val, interval, offset) as i64;
req.column_block_accessor.fetch_block(docs, &req.accessor);
for (doc, val) in req
agg_data
.column_block_accessor
.fetch_block(docs, &req.accessor);
for (doc, val) in agg_data
.column_block_accessor
.iter_docid_vals(docs, &req.accessor)
{

View File

@@ -1,7 +1,7 @@
use std::fmt::Debug;
use std::ops::Range;
use columnar::{Column, ColumnBlockAccessor, ColumnType};
use columnar::{Column, ColumnType};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
@@ -25,8 +25,6 @@ pub struct RangeAggReqData {
pub accessor: Column<u64>,
/// The type of the fast field.
pub field_type: ColumnType,
/// The column block accessor to access the fast field values.
pub column_block_accessor: ColumnBlockAccessor<u64>,
/// The range aggregation request.
pub req: RangeAggregation,
/// The name of the aggregation.
@@ -280,13 +278,15 @@ impl<const LOWCARD: bool> SegmentAggregationCollector for SegmentRangeCollector<
docs: &[crate::DocId],
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let mut req = agg_data.take_range_req_data(self.accessor_idx);
let req = agg_data.take_range_req_data(self.accessor_idx);
req.column_block_accessor.fetch_block(docs, &req.accessor);
agg_data
.column_block_accessor
.fetch_block(docs, &req.accessor);
let buckets = &mut self.parent_buckets[parent_bucket_id as usize];
for (doc, val) in req
for (doc, val) in agg_data
.column_block_accessor
.iter_docid_vals(docs, &req.accessor)
{

View File

@@ -4,8 +4,8 @@ use std::net::Ipv6Addr;
use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{
Column, ColumnBlockAccessor, ColumnType, Dictionary, MonotonicallyMappableToU128,
MonotonicallyMappableToU64, NumericalValue, StrColumn,
Column, ColumnType, Dictionary, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
NumericalValue, StrColumn,
};
use common::{BitSet, TinySet};
use rustc_hash::FxHashMap;
@@ -39,8 +39,6 @@ pub struct TermsAggReqData {
pub str_dict_column: Option<StrColumn>,
/// The missing value as u64 value.
pub missing_value_for_accessor: Option<u64>,
/// The column block accessor to access the fast field values.
pub column_block_accessor: ColumnBlockAccessor<u64>,
/// Used to build the correct nested result when we have an empty result.
pub sug_aggregations: Aggregations,
/// The name of the aggregation.
@@ -806,20 +804,20 @@ impl<TermMap: TermAggregationMap, const LOWCARD: bool> SegmentAggregationCollect
let req_data = &mut self.terms_req_data;
if let Some(missing) = req_data.missing_value_for_accessor {
req_data.column_block_accessor.fetch_block_with_missing(
agg_data.column_block_accessor.fetch_block_with_missing(
docs,
&req_data.accessor,
missing,
);
} else {
req_data
agg_data
.column_block_accessor
.fetch_block(docs, &req_data.accessor);
}
if let Some(sub_agg) = &mut self.sub_agg {
let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize];
let it = req_data
let it = agg_data
.column_block_accessor
.iter_docid_vals(docs, &req_data.accessor);
if let Some(allowed_bs) = req_data.allowed_term_ids.as_ref() {
@@ -840,7 +838,7 @@ impl<TermMap: TermAggregationMap, const LOWCARD: bool> SegmentAggregationCollect
}
} else {
let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize];
let it = req_data.column_block_accessor.iter_vals();
let it = agg_data.column_block_accessor.iter_vals();
if let Some(allowed_bs) = req_data.allowed_term_ids.as_ref() {
let it = it.filter(move |&term_id| allowed_bs.contains(term_id as u32));
Self::collect_terms(it, term_buckets, &mut self.bucket_id_provider);

View File

@@ -2,7 +2,7 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{BuildHasher, Hasher};
use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{Column, ColumnBlockAccessor, ColumnType, Dictionary, StrColumn};
use columnar::{Column, ColumnType, Dictionary, StrColumn};
use common::f64_to_u64;
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use rustc_hash::FxHashSet;
@@ -106,8 +106,6 @@ pub struct CardinalityAggReqData {
pub str_dict_column: Option<StrColumn>,
/// The missing value normalized to the internal u64 representation of the field type.
pub missing_value_for_accessor: Option<u64>,
/// The column block accessor to access the fast field values.
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
/// The name of the aggregation.
pub name: String,
/// The aggregation request.
@@ -135,11 +133,16 @@ impl CardinalityAggregationReq {
}
}
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug)]
pub(crate) struct SegmentCardinalityCollector {
buckets: Vec<SegmentCardinalityCollectorBucket>,
column_type: ColumnType,
accessor_idx: usize,
/// The column accessor to access the fast field values.
accessor: Column<u64>,
/// The column_type of the field.
column_type: ColumnType,
/// The missing value normalized to the internal u64 representation of the field type.
missing_value_for_accessor: Option<u64>,
}
#[derive(Clone, Debug, PartialEq, Default)]
@@ -213,29 +216,34 @@ impl SegmentCardinalityCollectorBucket {
}
impl SegmentCardinalityCollector {
pub fn from_req(column_type: ColumnType, accessor_idx: usize) -> Self {
pub fn from_req(
column_type: ColumnType,
accessor_idx: usize,
accessor: Column<u64>,
missing_value_for_accessor: Option<u64>,
) -> Self {
Self {
buckets: vec![SegmentCardinalityCollectorBucket::new(column_type); 1],
column_type,
accessor_idx,
accessor,
missing_value_for_accessor,
}
}
fn fetch_block_with_field(
&mut self,
docs: &[crate::DocId],
agg_data: &mut CardinalityAggReqData,
agg_data: &mut AggregationsSegmentCtx,
) {
if let Some(missing) = agg_data.missing_value_for_accessor {
agg_data.column_block_accessor.fetch_block_with_missing(
docs,
&agg_data.accessor,
missing,
);
if let Some(missing) = self.missing_value_for_accessor {
agg_data
.column_block_accessor
.fetch_block_with_missing(docs, &self.accessor, missing);
} else {
agg_data
.column_block_accessor
.fetch_block(docs, &agg_data.accessor);
.fetch_block(docs, &self.accessor);
}
}
}
@@ -268,17 +276,16 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
docs: &[crate::DocId],
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let req_data = agg_data.get_cardinality_req_data_mut(self.accessor_idx);
self.fetch_block_with_field(docs, req_data);
self.fetch_block_with_field(docs, agg_data);
let bucket = &mut self.buckets[parent_bucket_id as usize];
let col_block_accessor = &req_data.column_block_accessor;
if req_data.column_type == ColumnType::Str {
let col_block_accessor = &agg_data.column_block_accessor;
if self.column_type == ColumnType::Str {
for term_ord in col_block_accessor.iter_vals() {
bucket.entries.insert(term_ord);
}
} else if req_data.column_type == ColumnType::IpAddr {
let compact_space_accessor = req_data
} else if self.column_type == ColumnType::IpAddr {
let compact_space_accessor = self
.accessor
.values
.clone()

View File

@@ -323,7 +323,6 @@ pub(crate) struct SegmentExtendedStatsCollector {
missing: Option<u64>,
field_type: ColumnType,
accessor: columnar::Column<u64>,
column_block_accessor: columnar::ColumnBlockAccessor<u64>,
buckets: Vec<IntermediateExtendedStats>,
sigma: Option<f64>,
}
@@ -337,7 +336,6 @@ impl SegmentExtendedStatsCollector {
name: req.name.clone(),
field_type: req.field_type,
accessor: req.accessor.clone(),
column_block_accessor: req.column_block_accessor.clone(),
missing,
buckets: vec![IntermediateExtendedStats::with_sigma(sigma); 16],
sigma,
@@ -371,17 +369,20 @@ impl SegmentAggregationCollector for SegmentExtendedStatsCollector {
&mut self,
parent_bucket_id: BucketId,
docs: &[crate::DocId],
_agg_data: &mut AggregationsSegmentCtx,
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let mut extended_stats = self.buckets[parent_bucket_id as usize].clone();
if let Some(missing) = self.missing.as_ref() {
self.column_block_accessor
agg_data
.column_block_accessor
.fetch_block_with_missing(docs, &self.accessor, *missing);
} else {
self.column_block_accessor.fetch_block(docs, &self.accessor);
agg_data
.column_block_accessor
.fetch_block(docs, &self.accessor);
}
for val in self.column_block_accessor.iter_vals() {
for val in agg_data.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, self.field_type);
extended_stats.collect(val1);
}

View File

@@ -31,7 +31,7 @@ use std::collections::HashMap;
pub use average::*;
pub use cardinality::*;
use columnar::{Column, ColumnBlockAccessor, ColumnType};
use columnar::{Column, ColumnType};
pub use count::*;
pub use extended_stats::*;
pub use max::*;
@@ -55,8 +55,6 @@ pub struct MetricAggReqData {
pub field_type: ColumnType,
/// The missing value normalized to the internal u64 representation of the field type.
pub missing_u64: Option<u64>,
/// The column block accessor to access the fast field values.
pub column_block_accessor: ColumnBlockAccessor<u64>,
/// The column accessor to access the fast field values.
pub accessor: Column<u64>,
/// Used when converting to intermediate result

View File

@@ -130,10 +130,16 @@ impl PercentilesAggregationReq {
}
}
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug)]
pub(crate) struct SegmentPercentilesCollector {
pub(crate) buckets: Vec<PercentilesCollector>,
pub(crate) accessor_idx: usize,
/// The type of the field.
pub field_type: ColumnType,
/// The missing value normalized to the internal u64 representation of the field type.
pub missing_u64: Option<u64>,
/// The column accessor to access the fast field values.
pub accessor: Column<u64>,
}
#[derive(Clone, Serialize, Deserialize)]
@@ -228,11 +234,19 @@ impl PercentilesCollector {
}
impl SegmentPercentilesCollector {
pub fn from_req_and_validate(accessor_idx: usize) -> crate::Result<Self> {
Ok(Self {
pub fn from_req_and_validate(
field_type: ColumnType,
missing_u64: Option<u64>,
accessor: Column<u64>,
accessor_idx: usize,
) -> Self {
Self {
buckets: Vec::with_capacity(64),
field_type,
missing_u64,
accessor,
accessor_idx,
})
}
}
}
@@ -268,22 +282,18 @@ impl SegmentAggregationCollector for SegmentPercentilesCollector {
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let percentiles = &mut self.buckets[parent_bucket_id as usize];
let req_data = agg_data.get_metric_req_data_mut(self.accessor_idx);
if let Some(missing) = req_data.missing_u64.as_ref() {
req_data.column_block_accessor.fetch_block_with_missing(
docs,
&req_data.accessor,
*missing,
);
} else {
req_data
if let Some(missing) = self.missing_u64.as_ref() {
agg_data
.column_block_accessor
.fetch_block(docs, &req_data.accessor);
.fetch_block_with_missing(docs, &self.accessor, *missing);
} else {
agg_data
.column_block_accessor
.fetch_block(docs, &self.accessor);
}
for val in req_data.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, req_data.field_type);
for val in agg_data.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, self.field_type);
percentiles.collect(val1);
}

View File

@@ -1,6 +1,6 @@
use std::fmt::Debug;
use columnar::{Column, ColumnBlockAccessor, ColumnType};
use columnar::{Column, ColumnType};
use serde::{Deserialize, Serialize};
use super::*;
@@ -195,7 +195,6 @@ fn create_collector<const TYPE_ID: u8>(
collecting_for: req.collecting_for,
is_number_or_date_type: req.is_number_or_date_type,
missing_u64: req.missing_u64,
column_block_accessor: req.column_block_accessor.clone(),
accessor: req.accessor.clone(),
buckets: vec![IntermediateStats::default()],
})
@@ -222,7 +221,6 @@ pub(crate) fn build_segment_stats_collector(
pub(crate) struct SegmentStatsCollector<const COLUMN_TYPE_ID: u8> {
pub(crate) missing_u64: Option<u64>,
pub(crate) accessor: Column<u64>,
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
pub(crate) is_number_or_date_type: bool,
pub(crate) buckets: Vec<IntermediateStats>,
pub(crate) name: String,
@@ -275,7 +273,7 @@ impl<const COLUMN_TYPE_ID: u8> SegmentAggregationCollector
&mut self,
parent_bucket_id: BucketId,
docs: &[crate::DocId],
_agg_data: &mut AggregationsSegmentCtx,
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
// TODO: remove once we fetch all values for all bucket ids in one go
if docs.len() == 1 && self.missing_u64.is_none() {
@@ -288,14 +286,17 @@ impl<const COLUMN_TYPE_ID: u8> SegmentAggregationCollector
return Ok(());
}
if let Some(missing) = self.missing_u64.as_ref() {
self.column_block_accessor
agg_data
.column_block_accessor
.fetch_block_with_missing(docs, &self.accessor, *missing);
} else {
self.column_block_accessor.fetch_block(docs, &self.accessor);
agg_data
.column_block_accessor
.fetch_block(docs, &self.accessor);
}
collect_stats::<COLUMN_TYPE_ID>(
&mut self.buckets[parent_bucket_id as usize],
self.column_block_accessor.iter_vals(),
agg_data.column_block_accessor.iter_vals(),
self.is_number_or_date_type,
)?;