mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-11 13:00:42 +00:00
Compare commits
1 Commits
faster_his
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f50cb0208b |
2
.github/workflows/coverage.yml
vendored
2
.github/workflows/coverage.yml
vendored
@@ -28,7 +28,7 @@ jobs:
|
||||
- name: Generate code coverage
|
||||
run: cargo +nightly-2025-12-01 llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@fb8b3582c8e4def4969c97caa2f19720cb33a72f # v7.0.0
|
||||
uses: codecov/codecov-action@57e3a136b779b570ffcdbf80b3bdc90e7fab3de2 # v6.0.0
|
||||
continue-on-error: true
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos
|
||||
|
||||
2
.github/workflows/scorecard.yml
vendored
2
.github/workflows/scorecard.yml
vendored
@@ -44,6 +44,6 @@ jobs:
|
||||
|
||||
# Upload the results to GitHub's code scanning dashboard.
|
||||
- name: 'Upload to code-scanning'
|
||||
uses: github/codeql-action/upload-sarif@87557b9c84dde89fdd9b10e88954ac2f4248e463 # v4.36.1
|
||||
uses: github/codeql-action/upload-sarif@8aad20d150bbac5944a9f9d289da16a4b0d87c1e # v4.36.2
|
||||
with:
|
||||
sarif_file: results.sarif
|
||||
|
||||
@@ -66,8 +66,6 @@ fn bench_agg(mut group: InputGroup<Index>) {
|
||||
register!(group, terms_status_with_terms_zipf_1000_sub_agg);
|
||||
register!(group, terms_zipf_1000_with_terms_status_sub_agg);
|
||||
register!(group, terms_status_with_histogram);
|
||||
register!(group, terms_status_with_date_histogram);
|
||||
register!(group, terms_status_with_date_histogram_and_sibling_terms);
|
||||
register!(group, terms_zipf_1000);
|
||||
register!(group, terms_zipf_1000_with_histogram);
|
||||
register!(group, terms_zipf_1000_with_avg_sub_agg);
|
||||
@@ -392,34 +390,6 @@ fn terms_status_with_histogram(index: &Index) {
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
fn terms_status_with_date_histogram(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"my_texts": {
|
||||
"terms": { "field": "text_few_terms_status" },
|
||||
"aggs": {
|
||||
"over_time": { "date_histogram": { "field": "timestamp", "fixed_interval": "1h" } }
|
||||
}
|
||||
}
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
/// Same fused terms × date_histogram, but with a sibling terms aggregation next to it. The fused
|
||||
/// fast path should still trigger for `my_texts` (sibling aggregations are independent top-level
|
||||
/// aggregations, so they don't change its eligibility).
|
||||
fn terms_status_with_date_histogram_and_sibling_terms(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"my_texts": {
|
||||
"terms": { "field": "text_few_terms_status" },
|
||||
"aggs": {
|
||||
"over_time": { "date_histogram": { "field": "timestamp", "fixed_interval": "1h" } }
|
||||
}
|
||||
},
|
||||
"other_texts": { "terms": { "field": "text_few_terms" } }
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
fn terms_zipf_1000_with_histogram(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"my_texts": {
|
||||
@@ -813,9 +783,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
|
||||
doc_with_value /= 20;
|
||||
}
|
||||
let _val_max = 1_000_000.0;
|
||||
const SPAN_MS: i64 = 120 * 3600 * 1000; // 120 hours in ms
|
||||
const NOISE_MS: i64 = 2 * 3600 * 1000; // ±2h noise
|
||||
for i in 0..doc_with_value {
|
||||
for _ in 0..doc_with_value {
|
||||
let val: f64 = rng.random_range(0.0..1_000_000.0);
|
||||
let json = if rng.random_bool(0.1) {
|
||||
// 10% are numeric values
|
||||
@@ -823,9 +791,6 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
|
||||
} else {
|
||||
json!({"mixed_type": many_terms_data.choose(&mut rng).unwrap().to_string()})
|
||||
};
|
||||
let base_ms = (i as i64 * SPAN_MS) / doc_with_value as i64;
|
||||
let noise_ms = rng.random_range(-NOISE_MS..NOISE_MS);
|
||||
let ts_ms = (base_ms + noise_ms).clamp(0, SPAN_MS);
|
||||
index_writer.add_document(doc!(
|
||||
single_term => "single_term",
|
||||
text_field => "cool",
|
||||
@@ -838,7 +803,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
|
||||
score_field => val as u64,
|
||||
score_field_f64 => lg_norm.sample(&mut rng),
|
||||
score_field_i64 => val as i64,
|
||||
date_field => DateTime::from_timestamp_millis(ts_ms),
|
||||
date_field => DateTime::from_timestamp_millis((val * 1_000_000.) as i64),
|
||||
))?;
|
||||
if cardinality == Cardinality::OptionalSparse {
|
||||
for _ in 0..20 {
|
||||
|
||||
@@ -17,18 +17,7 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
|
||||
pub fn fetch_block<'a>(&'a mut self, docs: &'a [u32], accessor: &Column<T>) {
|
||||
if accessor.index.get_cardinality().is_full() {
|
||||
self.val_cache.resize(docs.len(), T::default());
|
||||
// When the docs form a contiguous ascending run we can fetch the values
|
||||
// as a single range. This lets codecs (e.g. bitpacked) bulk-decode the
|
||||
// slice instead of gathering value-by-value, and avoids per-value dynamic
|
||||
// dispatch. `docs` is always sorted ascending and free of duplicates here,
|
||||
// so comparing the endpoints is enough to detect contiguity.
|
||||
if is_contiguous(docs) {
|
||||
accessor
|
||||
.values
|
||||
.get_range(docs[0] as u64, &mut self.val_cache);
|
||||
} else {
|
||||
accessor.values.get_vals(docs, &mut self.val_cache);
|
||||
}
|
||||
accessor.values.get_vals(docs, &mut self.val_cache);
|
||||
} else {
|
||||
self.docid_cache.clear();
|
||||
self.row_id_cache.clear();
|
||||
@@ -169,22 +158,6 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if `docs` is a contiguous ascending run `[d, d + 1, ..., d + n - 1]`.
|
||||
///
|
||||
/// Assumes `docs` is sorted ascending and free of duplicates (the invariant for the
|
||||
/// doc blocks passed to `fetch_block`), so comparing the endpoints is sufficient.
|
||||
#[inline]
|
||||
fn is_contiguous(docs: &[u32]) -> bool {
|
||||
let (Some(&first), Some(&last)) = (docs.first(), docs.last()) else {
|
||||
return false;
|
||||
};
|
||||
debug_assert!(
|
||||
docs.windows(2).all(|w| w[0] < w[1]),
|
||||
"fetch_block requires docs sorted ascending without duplicates"
|
||||
);
|
||||
(last - first) as usize + 1 == docs.len()
|
||||
}
|
||||
|
||||
/// Given two sorted lists of docids `docs` and `hits`, hits is a subset of `docs`.
|
||||
/// Return all docs that are not in `hits`.
|
||||
fn find_missing_docs<F>(docs: &[u32], hits: &[u32], mut callback: F)
|
||||
@@ -315,46 +288,4 @@ mod tests {
|
||||
assert_eq!(accessor.docid_cache, vec![0]);
|
||||
assert_eq!(accessor.val_cache, vec![1]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_contiguous() {
|
||||
assert!(!is_contiguous(&[]));
|
||||
assert!(is_contiguous(&[5]));
|
||||
assert!(is_contiguous(&[5, 6, 7, 8]));
|
||||
assert!(is_contiguous(&[0, 1, 2]));
|
||||
assert!(!is_contiguous(&[5, 7, 8]));
|
||||
assert!(!is_contiguous(&[0, 1, 3]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fetch_block_contiguous_and_gather_match() {
|
||||
use crate::column_index::ColumnIndex;
|
||||
use crate::column_values::{
|
||||
ALL_U64_CODEC_TYPES, serialize_and_load_u64_based_column_values,
|
||||
};
|
||||
|
||||
let vals: Vec<u64> = (0..200u64).map(|i| i * 7 + 3).collect();
|
||||
let values =
|
||||
serialize_and_load_u64_based_column_values::<u64>(&&vals[..], &ALL_U64_CODEC_TYPES);
|
||||
let column = Column {
|
||||
index: ColumnIndex::Full,
|
||||
values,
|
||||
};
|
||||
|
||||
let check = |accessor: &mut ColumnBlockAccessor<u64>, docs: &[u32]| {
|
||||
accessor.fetch_block(docs, &column);
|
||||
let got: Vec<(u32, u64)> = accessor.iter_docid_vals(docs, &column).collect();
|
||||
let expected: Vec<(u32, u64)> = docs.iter().map(|&d| (d, vals[d as usize])).collect();
|
||||
assert_eq!(got, expected);
|
||||
};
|
||||
|
||||
let mut accessor = ColumnBlockAccessor::<u64>::default();
|
||||
// Contiguous block -> get_range fast path.
|
||||
check(&mut accessor, &(10..74).collect::<Vec<u32>>());
|
||||
// Non-contiguous block -> get_vals gather path.
|
||||
check(&mut accessor, &[0, 5, 9, 100, 199]);
|
||||
// Single doc and full span.
|
||||
check(&mut accessor, &[42]);
|
||||
check(&mut accessor, &(0..200).collect::<Vec<u32>>());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,18 +119,8 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync + DowncastSync {
|
||||
/// the segment's `maxdoc`.
|
||||
#[inline(always)]
|
||||
fn get_range(&self, start: u64, output: &mut [T]) {
|
||||
let mut out_chunks = output.chunks_exact_mut(4);
|
||||
let mut idx = start;
|
||||
for out_x4 in out_chunks.by_ref() {
|
||||
out_x4[0] = self.get_val(idx as u32);
|
||||
out_x4[1] = self.get_val((idx + 1) as u32);
|
||||
out_x4[2] = self.get_val((idx + 2) as u32);
|
||||
out_x4[3] = self.get_val((idx + 3) as u32);
|
||||
idx += 4;
|
||||
}
|
||||
for out in out_chunks.into_remainder() {
|
||||
for (out, idx) in output.iter_mut().zip(start..) {
|
||||
*out = self.get_val(idx as u32);
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -121,22 +121,6 @@ pub(crate) fn create_and_validate<TColumnCodec: ColumnCodec>(
|
||||
reader.get_vals(&all_docs, &mut buffer);
|
||||
assert_eq!(vals, buffer);
|
||||
|
||||
// Validate `get_range` over the full column and a sub-range. The sub-range starts
|
||||
// at a non-zero offset to exercise the entrance-ramp alignment of the batch decode.
|
||||
buffer.resize(all_docs.len(), 0);
|
||||
reader.get_range(0, &mut buffer);
|
||||
assert_eq!(vals, buffer, "get_range (full) mismatch in data set {name}");
|
||||
if vals.len() >= 2 {
|
||||
let start = 1usize;
|
||||
buffer.resize(vals.len() - start, 0);
|
||||
reader.get_range(start as u64, &mut buffer);
|
||||
assert_eq!(
|
||||
&vals[start..],
|
||||
&buffer[..],
|
||||
"get_range (sub-range) mismatch in data set {name}"
|
||||
);
|
||||
}
|
||||
|
||||
if !vals.is_empty() {
|
||||
let test_rand_idx = rand::rng().random_range(0..=vals.len() - 1);
|
||||
let expected_positions: Vec<u32> = vals
|
||||
|
||||
@@ -10,11 +10,11 @@ use crate::aggregation::accessor_helpers::{
|
||||
};
|
||||
use crate::aggregation::agg_req::{Aggregation, AggregationVariants, Aggregations};
|
||||
use crate::aggregation::bucket::{
|
||||
build_segment_filter_collector, build_segment_histogram_collector,
|
||||
build_segment_range_collector, CompositeAggReqData, CompositeAggregation,
|
||||
CompositeSourceAccessors, FilterAggReqData, HistogramAggReqData, HistogramBounds,
|
||||
IncludeExcludeParam, MissingTermAggReqData, RangeAggReqData, TermMissingAgg, TermsAggReqData,
|
||||
TermsAggregation, TermsAggregationInternal,
|
||||
build_segment_filter_collector, build_segment_range_collector, CompositeAggReqData,
|
||||
CompositeAggregation, CompositeSourceAccessors, FilterAggReqData, HistogramAggReqData,
|
||||
HistogramBounds, IncludeExcludeParam, MissingTermAggReqData, RangeAggReqData,
|
||||
SegmentHistogramCollector, TermMissingAgg, TermsAggReqData, TermsAggregation,
|
||||
TermsAggregationInternal,
|
||||
};
|
||||
use crate::aggregation::metric::{
|
||||
build_segment_stats_collector, AverageAggregation, CardinalityAggReqData,
|
||||
@@ -41,7 +41,7 @@ pub struct AggregationsSegmentCtx {
|
||||
|
||||
impl AggregationsSegmentCtx {
|
||||
pub(crate) fn push_term_req_data(&mut self, data: TermsAggReqData) -> usize {
|
||||
self.per_request.term_req_data.push(data);
|
||||
self.per_request.term_req_data.push(Some(Box::new(data)));
|
||||
self.per_request.term_req_data.len() - 1
|
||||
}
|
||||
pub(crate) fn push_cardinality_req_data(&mut self, data: CardinalityAggReqData) -> usize {
|
||||
@@ -61,25 +61,31 @@ impl AggregationsSegmentCtx {
|
||||
self.per_request.missing_term_req_data.len() - 1
|
||||
}
|
||||
pub(crate) fn push_histogram_req_data(&mut self, data: HistogramAggReqData) -> usize {
|
||||
self.per_request.histogram_req_data.push(data);
|
||||
self.per_request
|
||||
.histogram_req_data
|
||||
.push(Some(Box::new(data)));
|
||||
self.per_request.histogram_req_data.len() - 1
|
||||
}
|
||||
pub(crate) fn push_range_req_data(&mut self, data: RangeAggReqData) -> usize {
|
||||
self.per_request.range_req_data.push(data);
|
||||
self.per_request.range_req_data.push(Some(Box::new(data)));
|
||||
self.per_request.range_req_data.len() - 1
|
||||
}
|
||||
pub(crate) fn push_filter_req_data(&mut self, data: FilterAggReqData) -> usize {
|
||||
self.per_request.filter_req_data.push(data);
|
||||
self.per_request.filter_req_data.push(Some(Box::new(data)));
|
||||
self.per_request.filter_req_data.len() - 1
|
||||
}
|
||||
pub(crate) fn push_composite_req_data(&mut self, data: CompositeAggReqData) -> usize {
|
||||
self.per_request.composite_req_data.push(data);
|
||||
self.per_request
|
||||
.composite_req_data
|
||||
.push(Some(Box::new(data)));
|
||||
self.per_request.composite_req_data.len() - 1
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn get_term_req_data(&self, idx: usize) -> &TermsAggReqData {
|
||||
&self.per_request.term_req_data[idx]
|
||||
self.per_request.term_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("term_req_data slot is empty (taken)")
|
||||
}
|
||||
#[inline]
|
||||
pub(crate) fn get_cardinality_req_data(&self, idx: usize) -> &CardinalityAggReqData {
|
||||
@@ -97,6 +103,116 @@ impl AggregationsSegmentCtx {
|
||||
pub(crate) fn get_missing_term_req_data(&self, idx: usize) -> &MissingTermAggReqData {
|
||||
&self.per_request.missing_term_req_data[idx]
|
||||
}
|
||||
#[inline]
|
||||
pub(crate) fn get_histogram_req_data(&self, idx: usize) -> &HistogramAggReqData {
|
||||
self.per_request.histogram_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("histogram_req_data slot is empty (taken)")
|
||||
}
|
||||
#[inline]
|
||||
pub(crate) fn get_range_req_data(&self, idx: usize) -> &RangeAggReqData {
|
||||
self.per_request.range_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("range_req_data slot is empty (taken)")
|
||||
}
|
||||
#[inline]
|
||||
pub(crate) fn get_composite_req_data(&self, idx: usize) -> &CompositeAggReqData {
|
||||
self.per_request.composite_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("composite_req_data slot is empty (taken)")
|
||||
}
|
||||
|
||||
// ---------- mutable getters ----------
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn get_metric_req_data_mut(&mut self, idx: usize) -> &mut MetricAggReqData {
|
||||
&mut self.per_request.stats_metric_req_data[idx]
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn get_cardinality_req_data_mut(
|
||||
&mut self,
|
||||
idx: usize,
|
||||
) -> &mut CardinalityAggReqData {
|
||||
&mut self.per_request.cardinality_req_data[idx]
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn get_histogram_req_data_mut(&mut self, idx: usize) -> &mut HistogramAggReqData {
|
||||
self.per_request.histogram_req_data[idx]
|
||||
.as_deref_mut()
|
||||
.expect("histogram_req_data slot is empty (taken)")
|
||||
}
|
||||
|
||||
// ---------- take / put (terms, histogram, range) ----------
|
||||
|
||||
/// Move out the boxed Histogram request at `idx`, leaving `None`.
|
||||
#[inline]
|
||||
pub(crate) fn take_histogram_req_data(&mut self, idx: usize) -> Box<HistogramAggReqData> {
|
||||
self.per_request.histogram_req_data[idx]
|
||||
.take()
|
||||
.expect("histogram_req_data slot is empty (taken)")
|
||||
}
|
||||
|
||||
/// Put back a Histogram request into an empty slot at `idx`.
|
||||
#[inline]
|
||||
pub(crate) fn put_back_histogram_req_data(
|
||||
&mut self,
|
||||
idx: usize,
|
||||
value: Box<HistogramAggReqData>,
|
||||
) {
|
||||
debug_assert!(self.per_request.histogram_req_data[idx].is_none());
|
||||
self.per_request.histogram_req_data[idx] = Some(value);
|
||||
}
|
||||
|
||||
/// Move out the boxed Range request at `idx`, leaving `None`.
|
||||
#[inline]
|
||||
pub(crate) fn take_range_req_data(&mut self, idx: usize) -> Box<RangeAggReqData> {
|
||||
self.per_request.range_req_data[idx]
|
||||
.take()
|
||||
.expect("range_req_data slot is empty (taken)")
|
||||
}
|
||||
|
||||
/// Put back a Range request into an empty slot at `idx`.
|
||||
#[inline]
|
||||
pub(crate) fn put_back_range_req_data(&mut self, idx: usize, value: Box<RangeAggReqData>) {
|
||||
debug_assert!(self.per_request.range_req_data[idx].is_none());
|
||||
self.per_request.range_req_data[idx] = Some(value);
|
||||
}
|
||||
|
||||
/// Move out the boxed Filter request at `idx`, leaving `None`.
|
||||
#[inline]
|
||||
pub(crate) fn take_filter_req_data(&mut self, idx: usize) -> Box<FilterAggReqData> {
|
||||
self.per_request.filter_req_data[idx]
|
||||
.take()
|
||||
.expect("filter_req_data slot is empty (taken)")
|
||||
}
|
||||
|
||||
/// Put back a Filter request into an empty slot at `idx`.
|
||||
#[inline]
|
||||
pub(crate) fn put_back_filter_req_data(&mut self, idx: usize, value: Box<FilterAggReqData>) {
|
||||
debug_assert!(self.per_request.filter_req_data[idx].is_none());
|
||||
self.per_request.filter_req_data[idx] = Some(value);
|
||||
}
|
||||
|
||||
/// Move out the Composite request at `idx`.
|
||||
#[inline]
|
||||
pub(crate) fn take_composite_req_data(&mut self, idx: usize) -> Box<CompositeAggReqData> {
|
||||
self.per_request.composite_req_data[idx]
|
||||
.take()
|
||||
.expect("composite_req_data slot is empty (taken)")
|
||||
}
|
||||
|
||||
/// Put back a Composite request into an empty slot at `idx`.
|
||||
#[inline]
|
||||
pub(crate) fn put_back_composite_req_data(
|
||||
&mut self,
|
||||
idx: usize,
|
||||
value: Box<CompositeAggReqData>,
|
||||
) {
|
||||
debug_assert!(self.per_request.composite_req_data[idx].is_none());
|
||||
self.per_request.composite_req_data[idx] = Some(value);
|
||||
}
|
||||
}
|
||||
|
||||
/// Each type of aggregation has its own request data struct. This struct holds
|
||||
@@ -107,14 +223,15 @@ impl AggregationsSegmentCtx {
|
||||
/// for a node with [AggKind::Terms]).
|
||||
#[derive(Default)]
|
||||
pub struct PerRequestAggSegCtx {
|
||||
// Box for cheap take/put - Only necessary for bucket aggs that have sub-aggregations
|
||||
/// TermsAggReqData contains the request data for a terms aggregation.
|
||||
pub term_req_data: Vec<TermsAggReqData>,
|
||||
pub term_req_data: Vec<Option<Box<TermsAggReqData>>>,
|
||||
/// HistogramAggReqData contains the request data for a histogram aggregation.
|
||||
pub histogram_req_data: Vec<HistogramAggReqData>,
|
||||
pub histogram_req_data: Vec<Option<Box<HistogramAggReqData>>>,
|
||||
/// RangeAggReqData contains the request data for a range aggregation.
|
||||
pub range_req_data: Vec<RangeAggReqData>,
|
||||
pub range_req_data: Vec<Option<Box<RangeAggReqData>>>,
|
||||
/// FilterAggReqData contains the request data for a filter aggregation.
|
||||
pub filter_req_data: Vec<FilterAggReqData>,
|
||||
pub filter_req_data: Vec<Option<Box<FilterAggReqData>>>,
|
||||
/// Shared by avg, min, max, sum, stats, extended_stats, count
|
||||
pub stats_metric_req_data: Vec<MetricAggReqData>,
|
||||
/// CardinalityAggReqData contains the request data for a cardinality aggregation.
|
||||
@@ -124,7 +241,7 @@ pub struct PerRequestAggSegCtx {
|
||||
/// MissingTermAggReqData contains the request data for a missing term aggregation.
|
||||
pub missing_term_req_data: Vec<MissingTermAggReqData>,
|
||||
/// CompositeAggReqData contains the request data for a composite aggregation.
|
||||
pub composite_req_data: Vec<CompositeAggReqData>,
|
||||
pub composite_req_data: Vec<Option<Box<CompositeAggReqData>>>,
|
||||
|
||||
/// Request tree used to build collectors.
|
||||
pub agg_tree: Vec<AggRefNode>,
|
||||
@@ -135,22 +252,22 @@ impl PerRequestAggSegCtx {
|
||||
fn get_memory_consumption(&self) -> usize {
|
||||
self.term_req_data
|
||||
.iter()
|
||||
.map(|t| t.get_memory_consumption())
|
||||
.map(|b| b.as_ref().unwrap().get_memory_consumption())
|
||||
.sum::<usize>()
|
||||
+ self
|
||||
.histogram_req_data
|
||||
.iter()
|
||||
.map(|t| t.get_memory_consumption())
|
||||
.map(|b| b.as_ref().unwrap().get_memory_consumption())
|
||||
.sum::<usize>()
|
||||
+ self
|
||||
.range_req_data
|
||||
.iter()
|
||||
.map(|t| t.get_memory_consumption())
|
||||
.map(|b| b.as_ref().unwrap().get_memory_consumption())
|
||||
.sum::<usize>()
|
||||
+ self
|
||||
.filter_req_data
|
||||
.iter()
|
||||
.map(|t| t.get_memory_consumption())
|
||||
.map(|b| b.as_ref().unwrap().get_memory_consumption())
|
||||
.sum::<usize>()
|
||||
+ self
|
||||
.stats_metric_req_data
|
||||
@@ -175,7 +292,7 @@ impl PerRequestAggSegCtx {
|
||||
+ self
|
||||
.composite_req_data
|
||||
.iter()
|
||||
.map(|t| t.get_memory_consumption())
|
||||
.map(|b| b.as_ref().map(|d| d.get_memory_consumption()).unwrap_or(0))
|
||||
.sum::<usize>()
|
||||
+ self.agg_tree.len() * std::mem::size_of::<AggRefNode>()
|
||||
}
|
||||
@@ -184,16 +301,40 @@ impl PerRequestAggSegCtx {
|
||||
let idx = node.idx_in_req_data;
|
||||
let kind = node.kind;
|
||||
match kind {
|
||||
AggKind::Terms => self.term_req_data[idx].name.as_str(),
|
||||
AggKind::Terms => self.term_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("term_req_data slot is empty (taken)")
|
||||
.name
|
||||
.as_str(),
|
||||
AggKind::Cardinality => &self.cardinality_req_data[idx].name,
|
||||
AggKind::StatsKind(_) => &self.stats_metric_req_data[idx].name,
|
||||
AggKind::TopHits => &self.top_hits_req_data[idx].name,
|
||||
AggKind::MissingTerm => &self.missing_term_req_data[idx].name,
|
||||
AggKind::Histogram => self.histogram_req_data[idx].name.as_str(),
|
||||
AggKind::DateHistogram => self.histogram_req_data[idx].name.as_str(),
|
||||
AggKind::Range => self.range_req_data[idx].name.as_str(),
|
||||
AggKind::Filter => self.filter_req_data[idx].name.as_str(),
|
||||
AggKind::Composite => self.composite_req_data[idx].name.as_str(),
|
||||
AggKind::Histogram => self.histogram_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("histogram_req_data slot is empty (taken)")
|
||||
.name
|
||||
.as_str(),
|
||||
AggKind::DateHistogram => self.histogram_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("histogram_req_data slot is empty (taken)")
|
||||
.name
|
||||
.as_str(),
|
||||
AggKind::Range => self.range_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("range_req_data slot is empty (taken)")
|
||||
.name
|
||||
.as_str(),
|
||||
AggKind::Filter => self.filter_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("filter_req_data slot is empty (taken)")
|
||||
.name
|
||||
.as_str(),
|
||||
AggKind::Composite => self.composite_req_data[idx]
|
||||
.as_deref()
|
||||
.expect("composite_req_data slot is empty (taken)")
|
||||
.name
|
||||
.as_str(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,7 +412,7 @@ pub(crate) fn build_segment_agg_collector(
|
||||
Ok(Box::new(TermMissingAgg::new(req, node)?))
|
||||
}
|
||||
AggKind::Cardinality => {
|
||||
let req_data = req.get_cardinality_req_data(node.idx_in_req_data);
|
||||
let req_data = &mut req.get_cardinality_req_data_mut(node.idx_in_req_data);
|
||||
// For str columns, choose the per-bucket entries representation
|
||||
// based on the segment's column.max_value():
|
||||
// * small (< BITSET_MAX_TERM_ORD): `BitSet`, pre-allocated, no promotion machinery.
|
||||
@@ -318,7 +459,7 @@ pub(crate) fn build_segment_agg_collector(
|
||||
SegmentExtendedStatsCollector::from_req(req_data, sigma),
|
||||
)),
|
||||
StatsType::Percentiles => {
|
||||
let req_data = req.get_metric_req_data(node.idx_in_req_data);
|
||||
let req_data = req.get_metric_req_data_mut(node.idx_in_req_data);
|
||||
Ok(Box::new(
|
||||
SegmentPercentilesCollector::from_req_and_validate(
|
||||
req_data.field_type,
|
||||
@@ -338,8 +479,12 @@ pub(crate) fn build_segment_agg_collector(
|
||||
req_data.segment_ordinal,
|
||||
)))
|
||||
}
|
||||
AggKind::Histogram => build_segment_histogram_collector(req, node),
|
||||
AggKind::DateHistogram => build_segment_histogram_collector(req, node),
|
||||
AggKind::Histogram => Ok(Box::new(SegmentHistogramCollector::from_req_and_validate(
|
||||
req, node,
|
||||
)?)),
|
||||
AggKind::DateHistogram => Ok(Box::new(SegmentHistogramCollector::from_req_and_validate(
|
||||
req, node,
|
||||
)?)),
|
||||
AggKind::Range => Ok(build_segment_range_collector(req, node)?),
|
||||
AggKind::Filter => build_segment_filter_collector(req, node),
|
||||
AggKind::Composite => Ok(Box::new(
|
||||
@@ -654,18 +799,23 @@ fn build_nodes(
|
||||
let schema = reader.schema();
|
||||
let tokenizers = &data.context.tokenizers;
|
||||
let query = filter_req.parse_query(schema, tokenizers)?;
|
||||
let evaluator =
|
||||
std::rc::Rc::new(crate::aggregation::bucket::DocumentQueryEvaluator::new(
|
||||
query,
|
||||
schema.clone(),
|
||||
reader,
|
||||
)?);
|
||||
let evaluator = crate::aggregation::bucket::DocumentQueryEvaluator::new(
|
||||
query,
|
||||
schema.clone(),
|
||||
reader,
|
||||
)?;
|
||||
|
||||
// Pre-allocate buffer for batch filtering
|
||||
let max_doc = reader.max_doc();
|
||||
let buffer_capacity = crate::docset::COLLECT_BLOCK_BUFFER_LEN.min(max_doc as usize);
|
||||
let matching_docs_buffer = Vec::with_capacity(buffer_capacity);
|
||||
|
||||
let idx_in_req_data = data.push_filter_req_data(FilterAggReqData {
|
||||
name: agg_name.to_string(),
|
||||
req: filter_req.clone(),
|
||||
segment_reader: reader.clone(),
|
||||
evaluator,
|
||||
matching_docs_buffer,
|
||||
is_top_level,
|
||||
});
|
||||
let children = build_children(&req.sub_aggregation, reader, segment_ordinal, data)?;
|
||||
|
||||
@@ -16,7 +16,6 @@ use crate::{SegmentReader, TantivyError};
|
||||
|
||||
/// Contains all information required by the SegmentCompositeCollector to perform the
|
||||
/// composite aggregation on a segment.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CompositeAggReqData {
|
||||
/// The name of the aggregation.
|
||||
pub name: String,
|
||||
@@ -35,7 +34,6 @@ impl CompositeAggReqData {
|
||||
}
|
||||
|
||||
/// Accessors for a single column in a composite source.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CompositeAccessor {
|
||||
/// The fast field column
|
||||
pub column: Column<u64>,
|
||||
@@ -50,7 +48,6 @@ pub struct CompositeAccessor {
|
||||
}
|
||||
|
||||
/// Accessors to all the columns that belong to the field of a composite source.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CompositeSourceAccessors {
|
||||
/// The accessors for this source
|
||||
pub accessors: Vec<CompositeAccessor>,
|
||||
@@ -361,7 +358,7 @@ impl PrecomputedDateInterval {
|
||||
///
|
||||
/// Some column types (term, IP) might not have an exact representation of the
|
||||
/// specified after key
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub enum PrecomputedAfterKey {
|
||||
/// The after key could be exactly represented in the column space.
|
||||
Exact(u64),
|
||||
|
||||
@@ -118,7 +118,7 @@ impl InternalValueRepr {
|
||||
pub struct SegmentCompositeCollector {
|
||||
/// One DynArrayHeapMap per parent bucket.
|
||||
parent_buckets: Vec<DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>>,
|
||||
req_data: CompositeAggReqData,
|
||||
accessor_idx: usize,
|
||||
sub_agg: Option<BufferedSubAggs<HighCardSubAggBuffer>>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
/// Number of sources, needed when creating new DynArrayHeapMaps.
|
||||
@@ -132,7 +132,10 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
|
||||
results: &mut IntermediateAggregationResults,
|
||||
parent_bucket_id: BucketId,
|
||||
) -> crate::Result<()> {
|
||||
let name = self.req_data.name.clone();
|
||||
let name = agg_data
|
||||
.get_composite_req_data(self.accessor_idx)
|
||||
.name
|
||||
.clone();
|
||||
|
||||
let buckets = self.add_intermediate_bucket_result(agg_data, parent_bucket_id)?;
|
||||
results.push(
|
||||
@@ -150,11 +153,12 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
let mem_pre = self.get_memory_consumption(parent_bucket_id);
|
||||
let composite_agg_data = agg_data.take_composite_req_data(self.accessor_idx);
|
||||
|
||||
for doc in docs {
|
||||
let mut visitor = CompositeKeyVisitor {
|
||||
doc_id: *doc,
|
||||
composite_agg_data: &self.req_data,
|
||||
composite_agg_data: &composite_agg_data,
|
||||
buckets: &mut self.parent_buckets[parent_bucket_id as usize],
|
||||
sub_agg: &mut self.sub_agg,
|
||||
bucket_id_provider: &mut self.bucket_id_provider,
|
||||
@@ -162,6 +166,7 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
|
||||
};
|
||||
visitor.visit(0, true)?;
|
||||
}
|
||||
agg_data.put_back_composite_req_data(self.accessor_idx, composite_agg_data);
|
||||
|
||||
if let Some(sub_agg) = &mut self.sub_agg {
|
||||
sub_agg.check_flush_local(agg_data)?;
|
||||
@@ -216,13 +221,7 @@ impl SegmentCompositeCollector {
|
||||
req_data: &mut AggregationsSegmentCtx,
|
||||
node: &AggRefNode,
|
||||
) -> crate::Result<Self> {
|
||||
let composite_req_data =
|
||||
req_data.per_request.composite_req_data[node.idx_in_req_data].clone();
|
||||
validate_req(&composite_req_data)?;
|
||||
req_data
|
||||
.context
|
||||
.limits
|
||||
.add_memory_consumed(composite_req_data.get_memory_consumption() as u64)?;
|
||||
validate_req(req_data, node.idx_in_req_data)?;
|
||||
|
||||
let has_sub_aggregations = !node.children.is_empty();
|
||||
let sub_agg = if has_sub_aggregations {
|
||||
@@ -232,11 +231,12 @@ impl SegmentCompositeCollector {
|
||||
None
|
||||
};
|
||||
|
||||
let composite_req_data = req_data.get_composite_req_data(node.idx_in_req_data);
|
||||
let num_sources = composite_req_data.req.sources.len();
|
||||
|
||||
Ok(SegmentCompositeCollector {
|
||||
parent_buckets: vec![DynArrayHeapMap::try_new(num_sources)?],
|
||||
req_data: composite_req_data,
|
||||
accessor_idx: node.idx_in_req_data,
|
||||
sub_agg,
|
||||
bucket_id_provider: BucketIdProvider::default(),
|
||||
num_sources,
|
||||
@@ -258,7 +258,7 @@ impl SegmentCompositeCollector {
|
||||
let mut dict: FxHashMap<Vec<CompositeIntermediateKey>, IntermediateCompositeBucketEntry> =
|
||||
Default::default();
|
||||
dict.reserve(heap_map.size());
|
||||
let composite_data = &self.req_data;
|
||||
let composite_data = agg_data.get_composite_req_data(self.accessor_idx);
|
||||
for (key_internal_repr, agg) in heap_map.into_iter() {
|
||||
let key = resolve_key(&key_internal_repr, composite_data)?;
|
||||
let mut sub_aggregation_res = IntermediateAggregationResults::default();
|
||||
@@ -298,7 +298,8 @@ impl SegmentCompositeCollector {
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_req(composite_data: &CompositeAggReqData) -> crate::Result<()> {
|
||||
fn validate_req(req_data: &mut AggregationsSegmentCtx, accessor_idx: usize) -> crate::Result<()> {
|
||||
let composite_data = req_data.get_composite_req_data(accessor_idx);
|
||||
let req = &composite_data.req;
|
||||
if req.sources.is_empty() {
|
||||
return Err(TantivyError::InvalidArgument(
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::fmt::Debug;
|
||||
use std::rc::Rc;
|
||||
|
||||
use common::BitSet;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
@@ -397,7 +396,6 @@ impl PartialEq for FilterAggregation {
|
||||
|
||||
/// Request data for filter aggregation
|
||||
/// This struct holds the per-segment data needed to execute a filter aggregation
|
||||
#[derive(Clone)]
|
||||
pub struct FilterAggReqData {
|
||||
/// The name of the filter aggregation
|
||||
pub name: String,
|
||||
@@ -405,20 +403,22 @@ pub struct FilterAggReqData {
|
||||
pub req: FilterAggregation,
|
||||
/// The segment reader
|
||||
pub segment_reader: SegmentReader,
|
||||
/// Document evaluator for the filter query (precomputed BitSet).
|
||||
/// Wrapped in `Rc` so cloning the request data does not duplicate the (potentially large)
|
||||
/// underlying BitSet.
|
||||
pub evaluator: Rc<DocumentQueryEvaluator>,
|
||||
/// Document evaluator for the filter query (precomputed BitSet)
|
||||
/// This is built once when the request data is created
|
||||
pub evaluator: DocumentQueryEvaluator,
|
||||
/// Reusable buffer for matching documents to minimize allocations during collection
|
||||
pub matching_docs_buffer: Vec<DocId>,
|
||||
/// True if this filter aggregation is at the top level of the aggregation tree (not nested).
|
||||
pub is_top_level: bool,
|
||||
}
|
||||
|
||||
impl FilterAggReqData {
|
||||
pub(crate) fn get_memory_consumption(&self) -> usize {
|
||||
// Estimate: name + segment reader reference + bitset
|
||||
// Estimate: name + segment reader reference + bitset + buffer capacity
|
||||
self.name.len()
|
||||
+ std::mem::size_of::<SegmentReader>()
|
||||
+ self.evaluator.bitset.len() / 8 // BitSet memory (bits to bytes)
|
||||
+ self.matching_docs_buffer.capacity() * std::mem::size_of::<DocId>()
|
||||
+ std::mem::size_of::<bool>()
|
||||
}
|
||||
}
|
||||
@@ -509,10 +509,8 @@ pub struct SegmentFilterCollector<B: SubAggBuffer> {
|
||||
/// Sub-aggregation collectors
|
||||
sub_aggregations: Option<BufferedSubAggs<B>>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
/// Per-segment filter request data, owned by this collector.
|
||||
req_data: FilterAggReqData,
|
||||
/// Reusable buffer for matching documents to minimize allocations during collection.
|
||||
matching_docs_buffer: Vec<DocId>,
|
||||
/// Accessor index for this filter aggregation (to access FilterAggReqData)
|
||||
accessor_idx: usize,
|
||||
}
|
||||
|
||||
impl<B: SubAggBuffer> SegmentFilterCollector<B> {
|
||||
@@ -520,7 +518,6 @@ impl<B: SubAggBuffer> SegmentFilterCollector<B> {
|
||||
pub(crate) fn from_req_and_validate(
|
||||
req: &mut AggregationsSegmentCtx,
|
||||
node: &AggRefNode,
|
||||
req_data: FilterAggReqData,
|
||||
) -> crate::Result<Self> {
|
||||
// Build sub-aggregation collectors if any
|
||||
let sub_agg_collector = if !node.children.is_empty() {
|
||||
@@ -530,15 +527,11 @@ impl<B: SubAggBuffer> SegmentFilterCollector<B> {
|
||||
};
|
||||
let sub_agg_collector = sub_agg_collector.map(BufferedSubAggs::new);
|
||||
|
||||
let max_doc = req_data.segment_reader.max_doc();
|
||||
let buffer_capacity = crate::docset::COLLECT_BLOCK_BUFFER_LEN.min(max_doc as usize);
|
||||
|
||||
Ok(SegmentFilterCollector {
|
||||
parent_buckets: Vec::new(),
|
||||
sub_aggregations: sub_agg_collector,
|
||||
req_data,
|
||||
accessor_idx: node.idx_in_req_data,
|
||||
bucket_id_provider: BucketIdProvider::default(),
|
||||
matching_docs_buffer: Vec::with_capacity(buffer_capacity),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -547,23 +540,18 @@ pub(crate) fn build_segment_filter_collector(
|
||||
req: &mut AggregationsSegmentCtx,
|
||||
node: &AggRefNode,
|
||||
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
|
||||
let req_data = req.per_request.filter_req_data[node.idx_in_req_data].clone();
|
||||
req.context
|
||||
.limits
|
||||
.add_memory_consumed(req_data.get_memory_consumption() as u64)?;
|
||||
let is_top_level = req_data.is_top_level;
|
||||
let is_top_level = req.per_request.filter_req_data[node.idx_in_req_data]
|
||||
.as_ref()
|
||||
.expect("filter_req_data slot is empty")
|
||||
.is_top_level;
|
||||
|
||||
if is_top_level {
|
||||
Ok(Box::new(
|
||||
SegmentFilterCollector::<LowCardSubAggBuffer>::from_req_and_validate(
|
||||
req, node, req_data,
|
||||
)?,
|
||||
SegmentFilterCollector::<LowCardSubAggBuffer>::from_req_and_validate(req, node)?,
|
||||
))
|
||||
} else {
|
||||
Ok(Box::new(
|
||||
SegmentFilterCollector::<HighCardSubAggBuffer>::from_req_and_validate(
|
||||
req, node, req_data,
|
||||
)?,
|
||||
SegmentFilterCollector::<HighCardSubAggBuffer>::from_req_and_validate(req, node)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -573,7 +561,7 @@ impl<B: SubAggBuffer> Debug for SegmentFilterCollector<B> {
|
||||
f.debug_struct("SegmentFilterCollector")
|
||||
.field("buckets", &self.parent_buckets)
|
||||
.field("has_sub_aggs", &self.sub_aggregations.is_some())
|
||||
.field("name", &self.req_data.name)
|
||||
.field("accessor_idx", &self.accessor_idx)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -610,7 +598,11 @@ impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentFilterCollector<B>
|
||||
};
|
||||
|
||||
// Get the name of this filter aggregation
|
||||
let name = self.req_data.name.clone();
|
||||
let name = agg_data.per_request.filter_req_data[self.accessor_idx]
|
||||
.as_ref()
|
||||
.expect("filter_req_data slot is empty")
|
||||
.name
|
||||
.clone();
|
||||
|
||||
results.push(
|
||||
name,
|
||||
@@ -631,24 +623,27 @@ impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentFilterCollector<B>
|
||||
}
|
||||
|
||||
let mut bucket = self.parent_buckets[parent_bucket_id as usize];
|
||||
// Take the request data to avoid borrow checker issues with sub-aggregations
|
||||
let mut req = agg_data.take_filter_req_data(self.accessor_idx);
|
||||
|
||||
// Use batch filtering with O(1) BitSet lookups
|
||||
self.matching_docs_buffer.clear();
|
||||
self.req_data
|
||||
.evaluator
|
||||
.filter_batch(docs, &mut self.matching_docs_buffer);
|
||||
req.matching_docs_buffer.clear();
|
||||
req.evaluator
|
||||
.filter_batch(docs, &mut req.matching_docs_buffer);
|
||||
|
||||
bucket.doc_count += self.matching_docs_buffer.len() as u64;
|
||||
bucket.doc_count += req.matching_docs_buffer.len() as u64;
|
||||
|
||||
// Batch process sub-aggregations if we have matches
|
||||
if !self.matching_docs_buffer.is_empty() {
|
||||
if !req.matching_docs_buffer.is_empty() {
|
||||
if let Some(sub_aggs) = &mut self.sub_aggregations {
|
||||
for &doc_id in &self.matching_docs_buffer {
|
||||
for &doc_id in &req.matching_docs_buffer {
|
||||
sub_aggs.push(bucket.bucket_id, doc_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Put the request data back
|
||||
agg_data.put_back_filter_req_data(self.accessor_idx, req);
|
||||
if let Some(sub_aggs) = &mut self.sub_aggregations {
|
||||
sub_aggs.check_flush_local(agg_data)?;
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ use crate::TantivyError;
|
||||
|
||||
/// Contains all information required by the SegmentHistogramCollector to perform the
|
||||
/// histogram or date_histogram aggregation on a segment.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HistogramAggReqData {
|
||||
/// The column accessor to access the fast field values.
|
||||
pub accessor: Column<u64>,
|
||||
@@ -244,52 +243,19 @@ impl Display for HistogramBounds {
|
||||
}
|
||||
|
||||
impl HistogramBounds {
|
||||
pub(crate) fn contains(&self, val: f64) -> bool {
|
||||
fn contains(&self, val: f64) -> bool {
|
||||
val >= self.min && val <= self.max
|
||||
}
|
||||
}
|
||||
|
||||
/// The per-bucket identifier stored in a [`SegmentHistogramBucketEntry`].
|
||||
///
|
||||
/// It is [`BucketId`] when the histogram has sub aggregations (which key their state by it), and
|
||||
/// the zero-sized `()` when it does not. Without sub aggregations the id is never read, so storing
|
||||
/// `()` drops 8 bytes per bucket (24 -> 16) and turns id assignment into a no-op.
|
||||
pub trait BucketIdSlot: Copy + Default + std::fmt::Debug + PartialEq {
|
||||
/// Assigns the next id from the provider, called once when a bucket is first filled.
|
||||
fn assign(provider: &mut BucketIdProvider) -> Self;
|
||||
/// Resolves to the `BucketId` for sub-aggregation bookkeeping.
|
||||
///
|
||||
/// Only ever called for the [`BucketId`] slot: the `()` slot is used exactly when there are no
|
||||
/// sub aggregations, so every call site is guarded by `sub_agg.is_some()` and is dead for `()`.
|
||||
fn to_bucket_id(self) -> BucketId;
|
||||
}
|
||||
impl BucketIdSlot for BucketId {
|
||||
#[inline(always)]
|
||||
fn assign(provider: &mut BucketIdProvider) -> Self {
|
||||
provider.next_bucket_id()
|
||||
}
|
||||
#[inline(always)]
|
||||
fn to_bucket_id(self) -> BucketId {
|
||||
self
|
||||
}
|
||||
}
|
||||
impl BucketIdSlot for () {
|
||||
#[inline(always)]
|
||||
fn assign(_provider: &mut BucketIdProvider) -> Self {}
|
||||
#[inline(always)]
|
||||
fn to_bucket_id(self) -> BucketId {
|
||||
unreachable!("bucket ids are only resolved when sub aggregations are present")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug, PartialEq)]
|
||||
pub(crate) struct SegmentHistogramBucketEntry<B> {
|
||||
pub(crate) struct SegmentHistogramBucketEntry {
|
||||
pub key: f64,
|
||||
pub doc_count: u64,
|
||||
pub bucket_id: B,
|
||||
pub bucket_id: BucketId,
|
||||
}
|
||||
|
||||
impl<B: BucketIdSlot> SegmentHistogramBucketEntry<B> {
|
||||
impl SegmentHistogramBucketEntry {
|
||||
pub(crate) fn into_intermediate_bucket_entry(
|
||||
self,
|
||||
sub_aggregation: &mut Option<HighCardBufferedSubAggs>,
|
||||
@@ -302,7 +268,7 @@ impl<B: BucketIdSlot> SegmentHistogramBucketEntry<B> {
|
||||
.add_intermediate_aggregation_result(
|
||||
agg_data,
|
||||
&mut sub_aggregation_res,
|
||||
self.bucket_id.to_bucket_id(),
|
||||
self.bucket_id,
|
||||
)?;
|
||||
}
|
||||
Ok(IntermediateHistogramBucketEntry {
|
||||
@@ -313,147 +279,39 @@ impl<B: BucketIdSlot> SegmentHistogramBucketEntry<B> {
|
||||
}
|
||||
}
|
||||
|
||||
/// The contiguous bucket range a histogram can span, derived from the column min/max (clamped to
|
||||
/// the histogram bounds). Buckets in `[base_pos, base_pos + len)` can be stored in a flat `Vec`
|
||||
/// indexed by `bucket_pos - base_pos`, avoiding the hash map on the hot path.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub(crate) struct DenseRange {
|
||||
/// `bucket_pos` mapped to index 0 of the dense `Vec`.
|
||||
pub(crate) base_pos: i64,
|
||||
/// Number of bucket positions in the range.
|
||||
pub(crate) len: usize,
|
||||
#[derive(Clone, Debug, Default)]
|
||||
struct HistogramBuckets {
|
||||
pub buckets: FxHashMap<i64, SegmentHistogramBucketEntry>,
|
||||
}
|
||||
|
||||
/// Storage for the histogram buckets of a single parent bucket.
|
||||
///
|
||||
/// Starts out sparse (a hash map keyed by `bucket_pos`). Once enough distinct buckets have been
|
||||
/// filled that we are clearly going to cover most of the column's theoretical range, it switches
|
||||
/// to a dense `Vec` indexed by `bucket_pos - base_pos`, which removes hashing from the hot loop.
|
||||
#[derive(Clone, Debug)]
|
||||
enum HistogramBuckets<B> {
|
||||
Sparse(FxHashMap<i64, SegmentHistogramBucketEntry<B>>),
|
||||
Dense {
|
||||
base_pos: i64,
|
||||
/// One slot per bucket position; a slot with `doc_count == 0` has not been hit yet.
|
||||
buckets: Vec<SegmentHistogramBucketEntry<B>>,
|
||||
},
|
||||
}
|
||||
impl<B> Default for HistogramBuckets<B> {
|
||||
fn default() -> Self {
|
||||
HistogramBuckets::Sparse(FxHashMap::default())
|
||||
}
|
||||
}
|
||||
impl<B: BucketIdSlot> HistogramBuckets<B> {
|
||||
impl HistogramBuckets {
|
||||
fn memory_consumption(&self) -> u64 {
|
||||
let num_slots = match self {
|
||||
HistogramBuckets::Sparse(map) => map.capacity(),
|
||||
HistogramBuckets::Dense { buckets, .. } => buckets.capacity(),
|
||||
};
|
||||
num_slots as u64 * std::mem::size_of::<SegmentHistogramBucketEntry<B>>() as u64
|
||||
}
|
||||
|
||||
/// Switches from sparse to dense storage once the dense `Vec` would use no more memory than the
|
||||
/// hash map does now, so the switch never increases memory. Called at block boundaries.
|
||||
///
|
||||
/// The `Vec` holds one `Entry` per bucket position in the range. The map additionally stores
|
||||
/// the key and a control byte per slot, at a load factor of 7/16..7/8, so for a dense histogram
|
||||
/// its footprint grows past the `Vec` well before full coverage. And since the `Vec` never
|
||||
/// grows afterwards while the map would keep growing, dense only gets relatively cheaper — so
|
||||
/// no upper bound on the range is needed: a large but sparse range simply never crosses over.
|
||||
#[inline]
|
||||
fn maybe_densify(&mut self, dense_range: Option<DenseRange>) {
|
||||
let Some(range) = dense_range else { return };
|
||||
let HistogramBuckets::Sparse(map) = self else {
|
||||
return;
|
||||
};
|
||||
let dense_bytes = range
|
||||
.len
|
||||
.saturating_mul(std::mem::size_of::<SegmentHistogramBucketEntry<B>>());
|
||||
let sparse_bytes = map
|
||||
.capacity()
|
||||
.saturating_mul(std::mem::size_of::<(i64, SegmentHistogramBucketEntry<B>)>() + 1);
|
||||
if dense_bytes > sparse_bytes {
|
||||
return;
|
||||
}
|
||||
let map = std::mem::take(map);
|
||||
let mut buckets = vec![SegmentHistogramBucketEntry::<B>::default(); range.len];
|
||||
for (bucket_pos, entry) in map {
|
||||
buckets[(bucket_pos - range.base_pos) as usize] = entry;
|
||||
}
|
||||
*self = HistogramBuckets::Dense {
|
||||
base_pos: range.base_pos,
|
||||
buckets,
|
||||
};
|
||||
}
|
||||
|
||||
/// Returns the bucket entry for `bucket_pos`, setting its key (and `bucket_id`, when `B` is
|
||||
/// [`BucketId`]) on first use.
|
||||
///
|
||||
/// For the dense variant `bucket_pos` is guaranteed to be inside the range, since it is
|
||||
/// derived from the column min/max that bounds every value (see [`compute_dense_range`]).
|
||||
#[inline]
|
||||
fn get_or_create(
|
||||
&mut self,
|
||||
bucket_pos: i64,
|
||||
bucket_id_provider: &mut BucketIdProvider,
|
||||
key_from_pos: impl FnOnce(i64) -> f64,
|
||||
) -> &mut SegmentHistogramBucketEntry<B> {
|
||||
match self {
|
||||
HistogramBuckets::Sparse(map) => {
|
||||
map.entry(bucket_pos)
|
||||
.or_insert_with(|| SegmentHistogramBucketEntry {
|
||||
key: key_from_pos(bucket_pos),
|
||||
doc_count: 0,
|
||||
bucket_id: B::assign(bucket_id_provider),
|
||||
})
|
||||
}
|
||||
HistogramBuckets::Dense { base_pos, buckets } => {
|
||||
let idx = (bucket_pos - *base_pos) as usize;
|
||||
debug_assert!(idx < buckets.len(), "bucket_pos outside the dense range");
|
||||
let entry = &mut buckets[idx];
|
||||
if entry.doc_count == 0 {
|
||||
entry.key = key_from_pos(bucket_pos);
|
||||
entry.bucket_id = B::assign(bucket_id_provider);
|
||||
}
|
||||
entry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes the storage, yielding all non-empty bucket entries.
|
||||
fn into_filled_entries(self) -> Vec<SegmentHistogramBucketEntry<B>> {
|
||||
match self {
|
||||
HistogramBuckets::Sparse(map) => map.into_values().collect(),
|
||||
HistogramBuckets::Dense { buckets, .. } => {
|
||||
buckets.into_iter().filter(|b| b.doc_count > 0).collect()
|
||||
}
|
||||
}
|
||||
self.buckets.capacity() as u64 * std::mem::size_of::<SegmentHistogramBucketEntry>() as u64
|
||||
}
|
||||
}
|
||||
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
#[derive(Debug)]
|
||||
pub struct SegmentHistogramCollector<B> {
|
||||
pub struct SegmentHistogramCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
/// One Histogram bucket per parent bucket id.
|
||||
parent_buckets: Vec<HistogramBuckets<B>>,
|
||||
parent_buckets: Vec<HistogramBuckets>,
|
||||
sub_agg: Option<HighCardBufferedSubAggs>,
|
||||
req_data: HistogramAggReqData,
|
||||
accessor_idx: usize,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
/// Theoretical bucket range derived from the column min/max, if dense `Vec` storage is
|
||||
/// viable. `None` keeps every parent bucket in the sparse hash map.
|
||||
dense_range: Option<DenseRange>,
|
||||
}
|
||||
|
||||
impl<B: BucketIdSlot> SegmentAggregationCollector for SegmentHistogramCollector<B> {
|
||||
impl SegmentAggregationCollector for SegmentHistogramCollector {
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
results: &mut IntermediateAggregationResults,
|
||||
parent_bucket_id: BucketId,
|
||||
) -> crate::Result<()> {
|
||||
let name = self.req_data.name.clone();
|
||||
let name = agg_data
|
||||
.get_histogram_req_data(self.accessor_idx)
|
||||
.name
|
||||
.clone();
|
||||
// TODO: avoid prepare_max_bucket here and handle empty buckets.
|
||||
self.prepare_max_bucket(parent_bucket_id, agg_data)?;
|
||||
let histogram = std::mem::take(&mut self.parent_buckets[parent_bucket_id as usize]);
|
||||
@@ -470,13 +328,10 @@ impl<B: BucketIdSlot> SegmentAggregationCollector for SegmentHistogramCollector<
|
||||
docs: &[crate::DocId],
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
let req = agg_data.take_histogram_req_data(self.accessor_idx);
|
||||
let mem_pre = self.get_memory_consumption(parent_bucket_id);
|
||||
let dense_range = self.dense_range;
|
||||
let store = &mut self.parent_buckets[parent_bucket_id as usize];
|
||||
// Upgrade to dense storage before processing the block if the buckets are dense enough.
|
||||
store.maybe_densify(dense_range);
|
||||
let buckets = &mut self.parent_buckets[parent_bucket_id as usize].buckets;
|
||||
|
||||
let req = &self.req_data;
|
||||
let bounds = req.bounds;
|
||||
let interval = req.req.interval;
|
||||
let offset = req.offset;
|
||||
@@ -485,42 +340,31 @@ impl<B: BucketIdSlot> SegmentAggregationCollector for SegmentHistogramCollector<
|
||||
agg_data
|
||||
.column_block_accessor
|
||||
.fetch_block(docs, &req.accessor);
|
||||
// special path for nested buckets
|
||||
if let Some(sub_agg) = &mut self.sub_agg {
|
||||
for (doc, val) in agg_data
|
||||
.column_block_accessor
|
||||
.iter_docid_vals(docs, &req.accessor)
|
||||
{
|
||||
let val = f64_from_fastfield_u64(val, req.field_type);
|
||||
if bounds.contains(val) {
|
||||
let bucket = store.get_or_create(
|
||||
get_bucket_pos(val),
|
||||
&mut self.bucket_id_provider,
|
||||
|pos| get_bucket_key_from_pos(pos as f64, interval, offset),
|
||||
);
|
||||
bucket.doc_count += 1;
|
||||
sub_agg.push(bucket.bucket_id.to_bucket_id(), doc);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for val in agg_data.column_block_accessor.iter_vals() {
|
||||
let val = f64_from_fastfield_u64(val, req.field_type);
|
||||
if bounds.contains(val) {
|
||||
let bucket = store.get_or_create(
|
||||
get_bucket_pos(val),
|
||||
&mut self.bucket_id_provider,
|
||||
|pos| get_bucket_key_from_pos(pos as f64, interval, offset),
|
||||
);
|
||||
bucket.doc_count += 1;
|
||||
for (doc, val) in agg_data
|
||||
.column_block_accessor
|
||||
.iter_docid_vals(docs, &req.accessor)
|
||||
{
|
||||
let val = f64_from_fastfield_u64(val, req.field_type);
|
||||
let bucket_pos = get_bucket_pos(val);
|
||||
if bounds.contains(val) {
|
||||
let bucket = buckets.entry(bucket_pos).or_insert_with(|| {
|
||||
let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset);
|
||||
SegmentHistogramBucketEntry {
|
||||
key,
|
||||
doc_count: 0,
|
||||
bucket_id: self.bucket_id_provider.next_bucket_id(),
|
||||
}
|
||||
});
|
||||
bucket.doc_count += 1;
|
||||
if let Some(sub_agg) = &mut self.sub_agg {
|
||||
sub_agg.push(bucket.bucket_id, doc);
|
||||
}
|
||||
}
|
||||
}
|
||||
agg_data.put_back_histogram_req_data(self.accessor_idx, req);
|
||||
|
||||
// `checked_sub` is `None` when densifying shrank the accounted memory; only account growth.
|
||||
if let Some(mem_delta) = self
|
||||
.get_memory_consumption(parent_bucket_id)
|
||||
.checked_sub(mem_pre)
|
||||
{
|
||||
let mem_delta = self.get_memory_consumption(parent_bucket_id) - mem_pre;
|
||||
if mem_delta > 0 {
|
||||
agg_data.context.limits.add_memory_consumed(mem_delta)?;
|
||||
}
|
||||
|
||||
@@ -544,7 +388,9 @@ impl<B: BucketIdSlot> SegmentAggregationCollector for SegmentHistogramCollector<
|
||||
_agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
while self.parent_buckets.len() <= max_bucket as usize {
|
||||
self.parent_buckets.push(HistogramBuckets::default());
|
||||
self.parent_buckets.push(HistogramBuckets {
|
||||
buckets: FxHashMap::default(),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -561,7 +407,7 @@ impl<B: BucketIdSlot> SegmentAggregationCollector for SegmentHistogramCollector<
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BucketIdSlot> SegmentHistogramCollector<B> {
|
||||
impl SegmentHistogramCollector {
|
||||
fn get_memory_consumption(&self, parent_bucket_id: BucketId) -> u64 {
|
||||
self.parent_buckets[parent_bucket_id as usize].memory_consumption()
|
||||
}
|
||||
@@ -570,19 +416,21 @@ impl<B: BucketIdSlot> SegmentHistogramCollector<B> {
|
||||
fn add_intermediate_bucket_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
histogram: HistogramBuckets<B>,
|
||||
histogram: HistogramBuckets,
|
||||
) -> crate::Result<IntermediateBucketResult> {
|
||||
let filled = histogram.into_filled_entries();
|
||||
let mut buckets = Vec::with_capacity(filled.len());
|
||||
let mut buckets = Vec::with_capacity(histogram.buckets.len());
|
||||
|
||||
for bucket in filled {
|
||||
for bucket in histogram.buckets.into_values() {
|
||||
let bucket_res = bucket.into_intermediate_bucket_entry(&mut self.sub_agg, agg_data);
|
||||
|
||||
buckets.push(bucket_res?);
|
||||
}
|
||||
buckets.sort_unstable_by(|b1, b2| b1.key.total_cmp(&b2.key));
|
||||
|
||||
let is_date_agg = self.req_data.field_type == ColumnType::DateTime;
|
||||
let is_date_agg = agg_data
|
||||
.get_histogram_req_data(self.accessor_idx)
|
||||
.field_type
|
||||
== ColumnType::DateTime;
|
||||
Ok(IntermediateBucketResult::Histogram {
|
||||
buckets,
|
||||
is_date_agg,
|
||||
@@ -598,163 +446,32 @@ impl<B: BucketIdSlot> SegmentHistogramCollector<B> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut req_data = agg_data.per_request.histogram_req_data[node.idx_in_req_data].clone();
|
||||
normalize_histogram_req(&mut req_data)?;
|
||||
agg_data
|
||||
.context
|
||||
.limits
|
||||
.add_memory_consumed(req_data.get_memory_consumption() as u64)?;
|
||||
let dense_range = compute_dense_range(
|
||||
&req_data.accessor,
|
||||
req_data.field_type,
|
||||
req_data.req.interval,
|
||||
req_data.offset,
|
||||
req_data.bounds,
|
||||
);
|
||||
let req_data = agg_data.get_histogram_req_data_mut(node.idx_in_req_data);
|
||||
req_data.req.validate()?;
|
||||
if req_data.field_type == ColumnType::DateTime && !req_data.is_date_histogram {
|
||||
req_data.req.normalize_date_time();
|
||||
}
|
||||
req_data.bounds = req_data.req.hard_bounds.unwrap_or(HistogramBounds {
|
||||
min: f64::MIN,
|
||||
max: f64::MAX,
|
||||
});
|
||||
req_data.offset = req_data.req.offset.unwrap_or(0.0);
|
||||
let sub_agg = sub_agg.map(BufferedSubAggs::new);
|
||||
|
||||
Ok(Self {
|
||||
parent_buckets: Default::default(),
|
||||
sub_agg,
|
||||
req_data,
|
||||
accessor_idx: node.idx_in_req_data,
|
||||
bucket_id_provider: BucketIdProvider::default(),
|
||||
dense_range,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentHistogramCollector<()> {
|
||||
/// Builds a histogram collector whose parent `t` is a dense histogram filled from
|
||||
/// `counts[t * num_time_buckets .. (t + 1) * num_time_buckets]` (row-major). Used by the fused
|
||||
/// terms×histogram collector to turn its flat 2D counters into the regular intermediate result,
|
||||
/// so cross-segment merging is shared with the general path.
|
||||
pub(crate) fn from_dense_rows(
|
||||
req_data: HistogramAggReqData,
|
||||
base_pos: i64,
|
||||
num_time_buckets: usize,
|
||||
counts: &[u32],
|
||||
) -> Self {
|
||||
let interval = req_data.req.interval;
|
||||
let offset = req_data.offset;
|
||||
let num_parents = if num_time_buckets == 0 {
|
||||
0
|
||||
} else {
|
||||
counts.len() / num_time_buckets
|
||||
};
|
||||
let parent_buckets = (0..num_parents)
|
||||
.map(|t| {
|
||||
let row = &counts[t * num_time_buckets..(t + 1) * num_time_buckets];
|
||||
let buckets = row
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(b, &doc_count)| SegmentHistogramBucketEntry {
|
||||
key: get_bucket_key_from_pos(
|
||||
(base_pos + b as i64) as f64,
|
||||
interval,
|
||||
offset,
|
||||
),
|
||||
doc_count: doc_count as u64,
|
||||
bucket_id: (),
|
||||
})
|
||||
.collect();
|
||||
HistogramBuckets::Dense { base_pos, buckets }
|
||||
})
|
||||
.collect();
|
||||
Self {
|
||||
parent_buckets,
|
||||
sub_agg: None,
|
||||
req_data,
|
||||
bucket_id_provider: BucketIdProvider::default(),
|
||||
dense_range: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Validates and normalizes a histogram request in place: applies date ns-normalization (for a
|
||||
/// `histogram` on a date column) and resolves `bounds`/`offset` from the request.
|
||||
fn normalize_histogram_req(req_data: &mut HistogramAggReqData) -> crate::Result<()> {
|
||||
req_data.req.validate()?;
|
||||
if req_data.field_type == ColumnType::DateTime && !req_data.is_date_histogram {
|
||||
req_data.req.normalize_date_time();
|
||||
}
|
||||
req_data.bounds = req_data.req.hard_bounds.unwrap_or(HistogramBounds {
|
||||
min: f64::MIN,
|
||||
max: f64::MAX,
|
||||
});
|
||||
req_data.offset = req_data.req.offset.unwrap_or(0.0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clones and normalizes (resolving interval/offset/bounds) the histogram request at `node`, and
|
||||
/// returns it together with its dense bucket range — or `None` if the column has no usable range.
|
||||
/// Used by the fused terms×histogram collector, which then owns the normalized request.
|
||||
pub(crate) fn prepare_histogram_dense_range(
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
node: &AggRefNode,
|
||||
) -> crate::Result<Option<(HistogramAggReqData, DenseRange)>> {
|
||||
let mut req_data = agg_data.per_request.histogram_req_data[node.idx_in_req_data].clone();
|
||||
normalize_histogram_req(&mut req_data)?;
|
||||
let dense_range = compute_dense_range(
|
||||
&req_data.accessor,
|
||||
req_data.field_type,
|
||||
req_data.req.interval,
|
||||
req_data.offset,
|
||||
req_data.bounds,
|
||||
);
|
||||
Ok(dense_range.map(|range| (req_data, range)))
|
||||
}
|
||||
|
||||
/// Builds a boxed histogram (or date histogram) segment collector, picking the bucket-id storage
|
||||
/// based on whether there are sub aggregations: `()` (no id stored) when there are none, otherwise
|
||||
/// [`BucketId`].
|
||||
pub(crate) fn build_segment_histogram_collector(
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
node: &AggRefNode,
|
||||
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
|
||||
if node.children.is_empty() {
|
||||
Ok(Box::new(
|
||||
SegmentHistogramCollector::<()>::from_req_and_validate(agg_data, node)?,
|
||||
))
|
||||
} else {
|
||||
Ok(Box::new(
|
||||
SegmentHistogramCollector::<BucketId>::from_req_and_validate(agg_data, node)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn get_bucket_pos_f64(val: f64, interval: f64, offset: f64) -> f64 {
|
||||
fn get_bucket_pos_f64(val: f64, interval: f64, offset: f64) -> f64 {
|
||||
((val - offset) / interval).floor()
|
||||
}
|
||||
|
||||
/// Computes the dense bucket range for a column from its min/max value (clamped to the histogram
|
||||
/// bounds), or `None` if there are no values within bounds (or the range overflows `usize`).
|
||||
///
|
||||
/// There is no upper bound on the range: whether dense storage is actually used is decided later,
|
||||
/// per parent bucket, by [`HistogramBuckets::maybe_densify`] based on the memory it would save.
|
||||
///
|
||||
/// The column min/max bound every value the collector can see, so a `Vec` sized to this range can
|
||||
/// be indexed by `bucket_pos - base_pos` without any out-of-bounds check on the hot path.
|
||||
fn compute_dense_range(
|
||||
accessor: &Column<u64>,
|
||||
field_type: ColumnType,
|
||||
interval: f64,
|
||||
offset: f64,
|
||||
bounds: HistogramBounds,
|
||||
) -> Option<DenseRange> {
|
||||
let col_min = f64_from_fastfield_u64(accessor.min_value(), field_type);
|
||||
let col_max = f64_from_fastfield_u64(accessor.max_value(), field_type);
|
||||
let lo = col_min.max(bounds.min);
|
||||
let hi = col_max.min(bounds.max);
|
||||
if lo > hi {
|
||||
return None;
|
||||
}
|
||||
let base_pos = get_bucket_pos_f64(lo, interval, offset) as i64;
|
||||
let top_pos = get_bucket_pos_f64(hi, interval, offset) as i64;
|
||||
let len = usize::try_from(top_pos.checked_sub(base_pos)?.checked_add(1)?).ok()?;
|
||||
(len > 0).then_some(DenseRange { base_pos, len })
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_bucket_key_from_pos(bucket_pos: f64, interval: f64, offset: f64) -> f64 {
|
||||
bucket_pos * interval + offset
|
||||
@@ -1059,62 +776,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn histogram_dense_storage_test() -> crate::Result<()> {
|
||||
histogram_dense_storage_test_with_opt(false)?;
|
||||
histogram_dense_storage_test_with_opt(true)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Exercises the switch from sparse hash map to dense `Vec` storage. The switch happens at a
|
||||
/// block boundary (a block is `COLLECT_BLOCK_BUFFER_LEN` = 64 docs), so we need many docs in a
|
||||
/// single segment, densely covering the bucket range. `with_sub_agg` toggles the `iter_vals`
|
||||
/// fast path vs. the `iter_docid_vals` path used when there is a sub aggregation.
|
||||
fn histogram_dense_storage_test_with_opt(with_sub_agg: bool) -> crate::Result<()> {
|
||||
let num_buckets = 50usize;
|
||||
let docs_per_bucket = 10usize;
|
||||
// Value `k` repeated `docs_per_bucket` times for each bucket `k`, so every value in bucket
|
||||
// `k` equals `k` and the per-bucket average is exactly `k`.
|
||||
let values: Vec<f64> = (0..num_buckets * docs_per_bucket)
|
||||
.map(|i| (i % num_buckets) as f64)
|
||||
.collect();
|
||||
// `merge_segments = true` collapses the per-value segments into a single segment with all
|
||||
// the docs, which is collected in 64-doc blocks and therefore switches to dense storage.
|
||||
let index = get_test_index_from_values(true, &values)?;
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(if with_sub_agg {
|
||||
json!({
|
||||
"histogram": {
|
||||
"histogram": { "field": "score_f64", "interval": 1.0 },
|
||||
"aggs": { "avg": { "avg": { "field": "score_f64" } } }
|
||||
}
|
||||
})
|
||||
} else {
|
||||
json!({
|
||||
"histogram": {
|
||||
"histogram": { "field": "score_f64", "interval": 1.0 }
|
||||
}
|
||||
})
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
|
||||
for k in 0..num_buckets {
|
||||
assert_eq!(res["histogram"]["buckets"][k]["key"], k as f64);
|
||||
assert_eq!(
|
||||
res["histogram"]["buckets"][k]["doc_count"],
|
||||
docs_per_bucket as u64
|
||||
);
|
||||
if with_sub_agg {
|
||||
assert_eq!(res["histogram"]["buckets"][k]["avg"]["value"], k as f64);
|
||||
}
|
||||
}
|
||||
assert_eq!(res["histogram"]["buckets"][num_buckets], Value::Null);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn histogram_memory_limit() -> crate::Result<()> {
|
||||
let index = get_test_index_with_num_docs(true, 100)?;
|
||||
|
||||
@@ -23,7 +23,6 @@ use crate::TantivyError;
|
||||
|
||||
/// Contains all information required by the SegmentRangeCollector to perform the
|
||||
/// range aggregation on a segment.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RangeAggReqData {
|
||||
/// The column accessor to access the fast field values.
|
||||
pub accessor: Column<u64>,
|
||||
@@ -162,7 +161,7 @@ pub struct SegmentRangeCollector<B: SubAggBuffer> {
|
||||
/// One for each ParentBucketId
|
||||
parent_buckets: Vec<Vec<SegmentRangeAndBucketEntry>>,
|
||||
column_type: ColumnType,
|
||||
pub(crate) req_data: RangeAggReqData,
|
||||
pub(crate) accessor_idx: usize,
|
||||
sub_agg: Option<BufferedSubAggs<B>>,
|
||||
/// Here things get a bit weird. We need to assign unique bucket ids across all
|
||||
/// parent buckets. So we keep track of the next available bucket id here.
|
||||
@@ -185,7 +184,7 @@ impl<B: SubAggBuffer> Debug for SegmentRangeCollector<B> {
|
||||
f.debug_struct("SegmentRangeCollector")
|
||||
.field("parent_buckets_len", &self.parent_buckets.len())
|
||||
.field("column_type", &self.column_type)
|
||||
.field("name", &self.req_data.name)
|
||||
.field("accessor_idx", &self.accessor_idx)
|
||||
.field("has_sub_agg", &self.sub_agg.is_some())
|
||||
.finish()
|
||||
}
|
||||
@@ -240,7 +239,10 @@ impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
|
||||
) -> crate::Result<()> {
|
||||
self.prepare_max_bucket(parent_bucket_id, agg_data)?;
|
||||
let field_type = self.column_type;
|
||||
let name = self.req_data.name.to_string();
|
||||
let name = agg_data
|
||||
.get_range_req_data(self.accessor_idx)
|
||||
.name
|
||||
.to_string();
|
||||
|
||||
let buckets = std::mem::take(&mut self.parent_buckets[parent_bucket_id as usize]);
|
||||
|
||||
@@ -279,15 +281,17 @@ impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
|
||||
docs: &[crate::DocId],
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
let req = agg_data.take_range_req_data(self.accessor_idx);
|
||||
|
||||
agg_data
|
||||
.column_block_accessor
|
||||
.fetch_block(docs, &self.req_data.accessor);
|
||||
.fetch_block(docs, &req.accessor);
|
||||
|
||||
let buckets = &mut self.parent_buckets[parent_bucket_id as usize];
|
||||
|
||||
for (doc, val) in agg_data
|
||||
.column_block_accessor
|
||||
.iter_docid_vals(docs, &self.req_data.accessor)
|
||||
.iter_docid_vals(docs, &req.accessor)
|
||||
{
|
||||
let bucket_pos = get_bucket_pos(val, buckets);
|
||||
let bucket = &mut buckets[bucket_pos];
|
||||
@@ -297,6 +301,7 @@ impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
|
||||
}
|
||||
}
|
||||
|
||||
agg_data.put_back_range_req_data(self.accessor_idx, req);
|
||||
if let Some(sub_agg) = self.sub_agg.as_mut() {
|
||||
sub_agg.check_flush_local(agg_data)?;
|
||||
}
|
||||
@@ -314,10 +319,10 @@ impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
|
||||
fn prepare_max_bucket(
|
||||
&mut self,
|
||||
max_bucket: BucketId,
|
||||
_agg_data: &AggregationsSegmentCtx,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
while self.parent_buckets.len() <= max_bucket as usize {
|
||||
let new_buckets = self.create_new_buckets()?;
|
||||
let new_buckets = self.create_new_buckets(agg_data)?;
|
||||
self.parent_buckets.push(new_buckets);
|
||||
}
|
||||
|
||||
@@ -341,11 +346,8 @@ pub(crate) fn build_segment_range_collector(
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
node: &AggRefNode,
|
||||
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
|
||||
let req_data = agg_data.per_request.range_req_data[node.idx_in_req_data].clone();
|
||||
agg_data
|
||||
.context
|
||||
.limits
|
||||
.add_memory_consumed(req_data.get_memory_consumption() as u64)?;
|
||||
let accessor_idx = node.idx_in_req_data;
|
||||
let req_data = agg_data.get_range_req_data(node.idx_in_req_data);
|
||||
let field_type = req_data.field_type;
|
||||
|
||||
// TODO: A better metric instead of is_top_level would be the number of buckets expected.
|
||||
@@ -363,7 +365,7 @@ pub(crate) fn build_segment_range_collector(
|
||||
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggBuffer> {
|
||||
sub_agg: sub_agg.map(LowCardBufferedSubAggs::new),
|
||||
column_type: field_type,
|
||||
req_data,
|
||||
accessor_idx,
|
||||
parent_buckets: Vec::new(),
|
||||
bucket_id_provider: BucketIdProvider::default(),
|
||||
limits: agg_data.context.limits.clone(),
|
||||
@@ -372,7 +374,7 @@ pub(crate) fn build_segment_range_collector(
|
||||
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggBuffer> {
|
||||
sub_agg: sub_agg.map(BufferedSubAggs::new),
|
||||
column_type: field_type,
|
||||
req_data,
|
||||
accessor_idx,
|
||||
parent_buckets: Vec::new(),
|
||||
bucket_id_provider: BucketIdProvider::default(),
|
||||
limits: agg_data.context.limits.clone(),
|
||||
@@ -381,9 +383,12 @@ pub(crate) fn build_segment_range_collector(
|
||||
}
|
||||
|
||||
impl<B: SubAggBuffer> SegmentRangeCollector<B> {
|
||||
pub(crate) fn create_new_buckets(&mut self) -> crate::Result<Vec<SegmentRangeAndBucketEntry>> {
|
||||
pub(crate) fn create_new_buckets(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<Vec<SegmentRangeAndBucketEntry>> {
|
||||
let field_type = self.column_type;
|
||||
let req_data = &self.req_data;
|
||||
let req_data = agg_data.get_range_req_data(self.accessor_idx);
|
||||
// The range input on the request is f64.
|
||||
// We need to convert to u64 ranges, because we read the values as u64.
|
||||
// The mapping from the conversion is monotonic so ordering is preserved.
|
||||
@@ -558,16 +563,17 @@ mod tests {
|
||||
get_test_index_with_num_docs,
|
||||
};
|
||||
|
||||
pub fn build_test_buckets(
|
||||
ranges: &[RangeAggregationRange],
|
||||
pub fn get_collector_from_ranges(
|
||||
ranges: Vec<RangeAggregationRange>,
|
||||
field_type: ColumnType,
|
||||
) -> Vec<SegmentRangeAndBucketEntry> {
|
||||
) -> SegmentRangeCollector<HighCardSubAggBuffer> {
|
||||
let req = RangeAggregation {
|
||||
field: "dummy".to_string(),
|
||||
ranges: ranges.to_vec(),
|
||||
ranges,
|
||||
..Default::default()
|
||||
};
|
||||
extend_validate_ranges(&req.ranges, &field_type)
|
||||
// Build buckets directly as in from_req_and_validate without AggregationsData
|
||||
let buckets: Vec<_> = extend_validate_ranges(&req.ranges, &field_type)
|
||||
.expect("unexpected error in extend_validate_ranges")
|
||||
.iter()
|
||||
.map(|range| {
|
||||
@@ -598,7 +604,16 @@ mod tests {
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
.collect();
|
||||
|
||||
SegmentRangeCollector {
|
||||
parent_buckets: vec![buckets],
|
||||
column_type: field_type,
|
||||
accessor_idx: 0,
|
||||
sub_agg: None,
|
||||
bucket_id_provider: Default::default(),
|
||||
limits: AggregationLimitsGuard::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -841,10 +856,10 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn bucket_test_extend_range_hole() {
|
||||
let buckets = [(10f64..20f64).into(), (30f64..40f64).into()];
|
||||
let parent_buckets = [build_test_buckets(&buckets, ColumnType::F64)];
|
||||
let buckets = vec![(10f64..20f64).into(), (30f64..40f64).into()];
|
||||
let collector = get_collector_from_ranges(buckets, ColumnType::F64);
|
||||
|
||||
let buckets = parent_buckets[0].clone();
|
||||
let buckets = collector.parent_buckets[0].clone();
|
||||
assert_eq!(buckets[0].range.start, u64::MIN);
|
||||
assert_eq!(buckets[0].range.end, 10f64.to_u64());
|
||||
assert_eq!(buckets[1].range.start, 10f64.to_u64());
|
||||
@@ -860,14 +875,14 @@ mod tests {
|
||||
fn bucket_test_range_conversion_special_case() {
|
||||
// the monotonic conversion between f64 and u64, does not map f64::MIN.to_u64() ==
|
||||
// u64::MIN, but the into trait converts f64::MIN/MAX to None
|
||||
let buckets = [
|
||||
let buckets = vec![
|
||||
(f64::MIN..10f64).into(),
|
||||
(10f64..20f64).into(),
|
||||
(20f64..f64::MAX).into(),
|
||||
];
|
||||
let parent_buckets = [build_test_buckets(&buckets, ColumnType::F64)];
|
||||
let collector = get_collector_from_ranges(buckets, ColumnType::F64);
|
||||
|
||||
let buckets = parent_buckets[0].clone();
|
||||
let buckets = collector.parent_buckets[0].clone();
|
||||
assert_eq!(buckets[0].range.start, u64::MIN);
|
||||
assert_eq!(buckets[0].range.end, 10f64.to_u64());
|
||||
assert_eq!(buckets[1].range.start, 10f64.to_u64());
|
||||
@@ -879,28 +894,28 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn bucket_range_test_negative_vals() {
|
||||
let buckets = [(-10f64..-1f64).into()];
|
||||
let parent_buckets = [build_test_buckets(&buckets, ColumnType::F64)];
|
||||
let buckets = vec![(-10f64..-1f64).into()];
|
||||
let collector = get_collector_from_ranges(buckets, ColumnType::F64);
|
||||
|
||||
let buckets = parent_buckets[0].clone();
|
||||
let buckets = collector.parent_buckets[0].clone();
|
||||
assert_eq!(&buckets[0].bucket.key.to_string(), "*--10");
|
||||
assert_eq!(&buckets[buckets.len() - 1].bucket.key.to_string(), "-1-*");
|
||||
}
|
||||
#[test]
|
||||
fn bucket_range_test_positive_vals() {
|
||||
let buckets = [(0f64..10f64).into()];
|
||||
let parent_buckets = [build_test_buckets(&buckets, ColumnType::F64)];
|
||||
let buckets = vec![(0f64..10f64).into()];
|
||||
let collector = get_collector_from_ranges(buckets, ColumnType::F64);
|
||||
|
||||
let buckets = parent_buckets[0].clone();
|
||||
let buckets = collector.parent_buckets[0].clone();
|
||||
assert_eq!(&buckets[0].bucket.key.to_string(), "*-0");
|
||||
assert_eq!(&buckets[buckets.len() - 1].bucket.key.to_string(), "10-*");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn range_binary_search_test_u64() {
|
||||
let check_ranges = |ranges: &[RangeAggregationRange]| {
|
||||
let parent_buckets = [build_test_buckets(ranges, ColumnType::U64)];
|
||||
let search = |val: u64| get_bucket_pos(val, &parent_buckets[0]);
|
||||
let check_ranges = |ranges: Vec<RangeAggregationRange>| {
|
||||
let collector = get_collector_from_ranges(ranges, ColumnType::U64);
|
||||
let search = |val: u64| get_bucket_pos(val, &collector.parent_buckets[0]);
|
||||
|
||||
assert_eq!(search(u64::MIN), 0);
|
||||
assert_eq!(search(9), 0);
|
||||
@@ -913,7 +928,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let ranges = vec![(10.0..100.0).into()];
|
||||
check_ranges(&ranges);
|
||||
check_ranges(ranges);
|
||||
|
||||
let ranges = vec![
|
||||
RangeAggregationRange {
|
||||
@@ -923,7 +938,7 @@ mod tests {
|
||||
},
|
||||
(10.0..100.0).into(),
|
||||
];
|
||||
check_ranges(&ranges);
|
||||
check_ranges(ranges);
|
||||
|
||||
let ranges = vec![
|
||||
RangeAggregationRange {
|
||||
@@ -938,15 +953,15 @@ mod tests {
|
||||
from: Some(100.0),
|
||||
},
|
||||
];
|
||||
check_ranges(&ranges);
|
||||
check_ranges(ranges);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn range_binary_search_test_f64() {
|
||||
let ranges = [(10.0..100.0).into()];
|
||||
let ranges = vec![(10.0..100.0).into()];
|
||||
|
||||
let parent_buckets = [build_test_buckets(&ranges, ColumnType::F64)];
|
||||
let search = |val: u64| get_bucket_pos(val, &parent_buckets[0]);
|
||||
let collector = get_collector_from_ranges(ranges, ColumnType::F64);
|
||||
let search = |val: u64| get_bucket_pos(val, &collector.parent_buckets[0]);
|
||||
|
||||
assert_eq!(search(u64::MIN), 0);
|
||||
assert_eq!(search(9f64.to_u64()), 0);
|
||||
|
||||
@@ -29,8 +29,6 @@ use crate::aggregation::{format_date, BucketId, Key};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::TantivyError;
|
||||
|
||||
mod term_histogram;
|
||||
|
||||
/// Contains all information required by the SegmentTermCollector to perform the
|
||||
/// terms aggregation on a segment.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -379,18 +377,6 @@ pub(crate) fn build_segment_term_collector(
|
||||
let max_term_id: u64 =
|
||||
col_max_value.max(terms_req_data.missing_value_for_accessor.unwrap_or(0u64));
|
||||
|
||||
// Fused fast path: low-cardinality terms × a single `histogram`/`date_histogram` leaf over full
|
||||
// columns with a small enough bucket grid. Anything else falls through to the general path.
|
||||
if let Some(collector) = term_histogram::maybe_build_collector(
|
||||
req_data,
|
||||
node,
|
||||
&terms_req_data,
|
||||
max_term_id,
|
||||
is_top_level,
|
||||
)? {
|
||||
return Ok(collector);
|
||||
}
|
||||
|
||||
let sub_agg_collector = if has_sub_aggregations {
|
||||
Some(build_segment_agg_collectors(req_data, &node.children)?)
|
||||
} else {
|
||||
@@ -1,479 +0,0 @@
|
||||
//! Fused collector for the very common shape `terms` (low cardinality) × a single
|
||||
//! `histogram`/`date_histogram` sub-aggregation with nothing nested below it.
|
||||
//!
|
||||
//! See [`SegmentTermHistogramCollector`] for the approach and [`maybe_build_collector`] for the
|
||||
//! conditions under which it is used.
|
||||
|
||||
use columnar::ColumnBlockAccessor;
|
||||
|
||||
use super::{Bucket, SegmentTermCollector, TermsAggReqData, VecTermBuckets};
|
||||
use crate::aggregation::agg_data::{AggKind, AggRefNode, AggregationsSegmentCtx};
|
||||
use crate::aggregation::bucket::{
|
||||
get_bucket_pos_f64, prepare_histogram_dense_range, HistogramAggReqData,
|
||||
SegmentHistogramCollector,
|
||||
};
|
||||
use crate::aggregation::buffered_sub_aggs::LowCardSubAggBuffer;
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::{BucketIdProvider, SegmentAggregationCollector};
|
||||
use crate::aggregation::{f64_from_fastfield_u64, BucketId};
|
||||
|
||||
/// Maximum number of cells (`num_terms × num_time_buckets`) in the fused flat 2D grid. Above this
|
||||
/// the grid would be too large/cache-unfriendly, so we fall back to the general buffered path.
|
||||
/// `1 << 14` cells = 128 KB of `u64` counters, comfortably L2-resident.
|
||||
const MAX_FUSED_GRID_BUCKETS: usize = 1 << 14;
|
||||
|
||||
/// Fused collector for `terms` (low cardinality) × a single `histogram`/`date_histogram` leaf with
|
||||
/// nothing nested below it, when the resulting `num_terms × num_time_buckets` grid is small (see
|
||||
/// [`MAX_FUSED_GRID_BUCKETS`]).
|
||||
///
|
||||
/// It keeps a flat, fully dense 2D counter grid (`counts[term * num_time_buckets + bucket]`) and a
|
||||
/// per-term total. A single pass reads both the term and histogram columns in document order and
|
||||
/// bumps the counters directly — no doc-id buffering, no per-term scattered re-fetch, no dynamic
|
||||
/// dispatch on flush, no per-bucket key/id storage during collection (keys are derived from the
|
||||
/// index at the end).
|
||||
///
|
||||
/// At result time the flat grid is expanded back into the regular term map + histogram storage and
|
||||
/// handed to the shared intermediate-result builders, so cross-segment merging is identical to the
|
||||
/// general path.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SegmentTermHistogramCollector {
|
||||
/// `[num_terms]` total doc count per term bucket (independent of the histogram bounds).
|
||||
/// `u32` is enough: a per-segment count can't exceed the segment's doc count (`DocId` is
|
||||
/// `u32`); the fused path is only taken when `num_docs < u32::MAX` (see
|
||||
/// `maybe_build_collector`).
|
||||
term_counts: Vec<u32>,
|
||||
/// Flat row-major `[num_terms * num_time_buckets]` histogram counters (`u32`, see
|
||||
/// `term_counts`).
|
||||
counts: Vec<u32>,
|
||||
/// Histogram buckets per term (the dense time-range length).
|
||||
num_time_buckets: usize,
|
||||
/// `bucket_pos` mapped to time-bucket index 0.
|
||||
base_pos: i64,
|
||||
terms_req_data: TermsAggReqData,
|
||||
/// The (cloned, normalized) histogram request: its column + interval/offset/bounds.
|
||||
hist_req_data: HistogramAggReqData,
|
||||
/// Private block accessors for both columns. We read them together, so each needs its own
|
||||
/// (the shared `agg_data` scratch accessor only holds one block at a time). Owning them keeps
|
||||
/// `collect` independent of `agg_data`.
|
||||
term_block: ColumnBlockAccessor<u64>,
|
||||
hist_block: ColumnBlockAccessor<u64>,
|
||||
/// When true, `term_counts` is left untouched during `collect` and derived from the grid
|
||||
/// row-sums at result time. Valid only when there are no hard bounds: then every doc lands in
|
||||
/// the dense range, so the per-term total equals the sum over its histogram buckets, and we
|
||||
/// can drop both the per-doc `term_counts` increment and the (always-true) bounds check.
|
||||
///
|
||||
/// This is a real hot-loop win (~17% on `terms_status_with_(date_)histogram`): with low term
|
||||
/// cardinality, `term_counts[term_id] += 1` repeatedly hits the same few addresses, so the
|
||||
/// read-modify-write serializes across iterations on store-to-load forwarding latency. Summing
|
||||
/// the grid once at the end avoids that loop-carried dependency entirely.
|
||||
derive_term_counts_from_histogram: bool,
|
||||
}
|
||||
|
||||
impl SegmentAggregationCollector for SegmentTermHistogramCollector {
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
results: &mut IntermediateAggregationResults,
|
||||
parent_bucket_id: BucketId,
|
||||
) -> crate::Result<()> {
|
||||
debug_assert_eq!(
|
||||
parent_bucket_id, 0,
|
||||
"fused term-histogram collector is top-level only"
|
||||
);
|
||||
// Expand the flat grid back into the regular structures and reuse the shared builders, so
|
||||
// ordering/cut-off/dict handling and cross-segment merging match the general path exactly.
|
||||
if self.derive_term_counts_from_histogram {
|
||||
// `collect` skipped the per-doc total; recover it as the grid row-sum (every doc landed
|
||||
// in a bucket, since there are no hard bounds).
|
||||
let nb = self.num_time_buckets;
|
||||
let counts = &self.counts;
|
||||
for (t, slot) in self.term_counts.iter_mut().enumerate() {
|
||||
*slot = counts[t * nb..(t + 1) * nb].iter().sum();
|
||||
}
|
||||
}
|
||||
let mut bucket_id_provider = BucketIdProvider::default();
|
||||
let term_buckets = VecTermBuckets {
|
||||
buckets: self
|
||||
.term_counts
|
||||
.iter()
|
||||
.map(|&count| Bucket {
|
||||
count,
|
||||
bucket_id: bucket_id_provider.next_bucket_id(),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
let mut histogram = SegmentHistogramCollector::<()>::from_dense_rows(
|
||||
self.hist_req_data.clone(),
|
||||
self.base_pos,
|
||||
self.num_time_buckets,
|
||||
&self.counts,
|
||||
);
|
||||
let name = self.terms_req_data.name.clone();
|
||||
let bucket = SegmentTermCollector::<VecTermBuckets, LowCardSubAggBuffer>::into_intermediate_bucket_result(
|
||||
&self.terms_req_data,
|
||||
Some(&mut histogram as &mut dyn SegmentAggregationCollector),
|
||||
term_buckets,
|
||||
agg_data,
|
||||
)?;
|
||||
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn collect(
|
||||
&mut self,
|
||||
parent_bucket_id: BucketId,
|
||||
docs: &[crate::DocId],
|
||||
_agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
debug_assert_eq!(
|
||||
parent_bucket_id, 0,
|
||||
"fused term-histogram collector is top-level only"
|
||||
);
|
||||
|
||||
// Fetch both columns into our own accessors (we read them together, so they can't share the
|
||||
// single `agg_data` scratch accessor). The collector owns all its inputs, so `collect`
|
||||
// doesn't touch `agg_data`.
|
||||
self.term_block
|
||||
.fetch_block(docs, &self.terms_req_data.accessor);
|
||||
self.hist_block
|
||||
.fetch_block(docs, &self.hist_req_data.accessor);
|
||||
|
||||
let field_type = self.hist_req_data.field_type;
|
||||
let bounds = self.hist_req_data.bounds;
|
||||
let interval = self.hist_req_data.req.interval;
|
||||
let offset = self.hist_req_data.offset;
|
||||
let base_pos = self.base_pos;
|
||||
let num_time_buckets = self.num_time_buckets;
|
||||
let term_counts = &mut self.term_counts;
|
||||
let counts = &mut self.counts;
|
||||
|
||||
// Both columns are full (checked at construction), so values align with `docs` positionally
|
||||
// and are read together in one pass.
|
||||
if self.derive_term_counts_from_histogram {
|
||||
// No hard bounds: every doc lands in a bucket, so we skip the per-doc `term_counts`
|
||||
// increment (derived from the grid at flush) and the always-true bounds check.
|
||||
for (term_id, hist_raw) in self.term_block.iter_vals().zip(self.hist_block.iter_vals())
|
||||
{
|
||||
let term_id = term_id as usize;
|
||||
let val = f64_from_fastfield_u64(hist_raw, field_type);
|
||||
let bucket = (get_bucket_pos_f64(val, interval, offset) as i64 - base_pos) as usize;
|
||||
debug_assert!(
|
||||
bucket < num_time_buckets,
|
||||
"histogram bucket outside dense range"
|
||||
);
|
||||
counts[term_id * num_time_buckets + bucket] += 1;
|
||||
}
|
||||
} else {
|
||||
for (term_id, hist_raw) in self.term_block.iter_vals().zip(self.hist_block.iter_vals())
|
||||
{
|
||||
let term_id = term_id as usize;
|
||||
term_counts[term_id] += 1;
|
||||
let val = f64_from_fastfield_u64(hist_raw, field_type);
|
||||
if bounds.contains(val) {
|
||||
let bucket =
|
||||
(get_bucket_pos_f64(val, interval, offset) as i64 - base_pos) as usize;
|
||||
debug_assert!(
|
||||
bucket < num_time_buckets,
|
||||
"histogram bucket outside dense range"
|
||||
);
|
||||
counts[term_id * num_time_buckets + bucket] += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush(&mut self, _agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
|
||||
// Nothing is buffered: `collect` writes the flat grid directly.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_max_bucket(
|
||||
&mut self,
|
||||
_max_bucket: BucketId,
|
||||
_agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
// Top-level: the flat grid is allocated up front.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compute_metric_value(
|
||||
&self,
|
||||
_bucket_id: BucketId,
|
||||
_sub_agg_name: &str,
|
||||
_sub_agg_property: &str,
|
||||
_agg_data: &AggregationsSegmentCtx,
|
||||
) -> Option<f64> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds the fused terms×histogram collector for a single top-level parent, when the shape is
|
||||
/// eligible. Returns `Ok(None)` to fall back to the general buffered terms path.
|
||||
///
|
||||
/// Eligibility: top-level, low-cardinality terms over a full column with no missing/include-exclude
|
||||
/// handling; a single `histogram`/`date_histogram` leaf (no nesting below it) over a full column;
|
||||
/// and a `num_terms × num_time_buckets` grid no larger than [`MAX_FUSED_GRID_BUCKETS`].
|
||||
pub(super) fn maybe_build_collector(
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
node: &AggRefNode,
|
||||
terms_req_data: &TermsAggReqData,
|
||||
max_term_id: u64,
|
||||
is_top_level: bool,
|
||||
) -> crate::Result<Option<Box<dyn SegmentAggregationCollector>>> {
|
||||
// Both columns must be full (one value per doc) so their values align positionally with `docs`
|
||||
// and we can zip them. Requiring full columns also makes the terms agg's `missing` config a
|
||||
// no-op (`fetch_block_with_missing` early-returns on full columns), so we needn't check for it.
|
||||
//
|
||||
// We don't cap the term cardinality here: the flat grid is bounded by the total cell count
|
||||
// (`num_terms * num_time_buckets <= MAX_FUSED_GRID_BUCKETS`) checked below, which subsumes it.
|
||||
let fuseable = is_top_level
|
||||
&& terms_req_data.allowed_term_ids.is_none()
|
||||
&& terms_req_data.accessor.get_cardinality().is_full()
|
||||
// The flat counters are `u32`; a per-segment count can't exceed the doc count, so this
|
||||
// guarantees no overflow (essentially always true, as `DocId` is `u32`).
|
||||
&& terms_req_data.accessor.num_docs() < u32::MAX
|
||||
&& node.children.len() == 1
|
||||
&& matches!(
|
||||
node.children[0].kind,
|
||||
AggKind::Histogram | AggKind::DateHistogram
|
||||
)
|
||||
&& node.children[0].children.is_empty()
|
||||
&& agg_data.per_request.histogram_req_data[node.children[0].idx_in_req_data]
|
||||
.accessor
|
||||
.get_cardinality()
|
||||
.is_full();
|
||||
if !fuseable {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Clone + normalize the histogram request and get its dense bucket range; only take the fused
|
||||
// path when the flat `num_terms × num_time_buckets` grid is small enough.
|
||||
let Some((hist_req_data, range)) = prepare_histogram_dense_range(agg_data, &node.children[0])?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let num_terms = (max_term_id + 1) as usize;
|
||||
if num_terms.saturating_mul(range.len) > MAX_FUSED_GRID_BUCKETS {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let counts = vec![0u32; num_terms * range.len];
|
||||
agg_data
|
||||
.context
|
||||
.limits
|
||||
.add_memory_consumed((counts.len() * std::mem::size_of::<u32>()) as u64)?;
|
||||
// With no hard bounds every doc lands in the dense range, so the per-term totals can be derived
|
||||
// from the grid at result time instead of incremented per doc.
|
||||
let derive_term_counts_from_histogram =
|
||||
hist_req_data.bounds.min == f64::MIN && hist_req_data.bounds.max == f64::MAX;
|
||||
Ok(Some(Box::new(SegmentTermHistogramCollector {
|
||||
term_counts: vec![0u32; num_terms],
|
||||
counts,
|
||||
num_time_buckets: range.len,
|
||||
base_pos: range.base_pos,
|
||||
terms_req_data: terms_req_data.clone(),
|
||||
hist_req_data,
|
||||
term_block: ColumnBlockAccessor::default(),
|
||||
hist_block: ColumnBlockAccessor::default(),
|
||||
derive_term_counts_from_histogram,
|
||||
})))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::tests::{exec_request, get_test_index_from_values_and_terms};
|
||||
|
||||
/// Hand-computed correctness check for the fused terms×histogram fast path
|
||||
/// ([`super::SegmentTermHistogramCollector`]): low-cardinality terms × a histogram leaf over
|
||||
/// full columns, exercised single- and multi-segment.
|
||||
#[test]
|
||||
fn fused_term_histogram_test() -> crate::Result<()> {
|
||||
fused_term_histogram_with_opt(false)?;
|
||||
fused_term_histogram_with_opt(true)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn fused_term_histogram_with_opt(merge_segments: bool) -> crate::Result<()> {
|
||||
// 300 docs: term = {a, b, c} by i % 3, histogram value = i % 20 (interval 1 => buckets
|
||||
// 0..19). gcd(3, 20) = 1, so every (term, bucket) pair occurs exactly 300 / 60 = 5 times.
|
||||
let docs: Vec<(f64, String)> = (0..300u64)
|
||||
.map(|i| {
|
||||
(
|
||||
(i % 20) as f64,
|
||||
["a", "b", "c"][(i % 3) as usize].to_string(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
// Two segments, to also exercise cross-segment merging of the fused per-term histograms.
|
||||
let segments = vec![docs[..150].to_vec(), docs[150..].to_vec()];
|
||||
let index = get_test_index_from_values_and_terms(merge_segments, &segments)?;
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(serde_json::json!({
|
||||
"by_term": {
|
||||
"terms": { "field": "string_id", "order": { "_key": "asc" } },
|
||||
"aggs": {
|
||||
"histo": { "histogram": { "field": "score_f64", "interval": 1.0 } }
|
||||
}
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
|
||||
for (term_idx, term) in ["a", "b", "c"].iter().enumerate() {
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["key"], *term);
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["doc_count"], 100);
|
||||
let histo = &res["by_term"]["buckets"][term_idx]["histo"]["buckets"];
|
||||
for b in 0..20usize {
|
||||
assert_eq!(histo[b]["key"], b as f64, "term {term} bucket {b}");
|
||||
assert_eq!(histo[b]["doc_count"], 5, "term {term} bucket {b}");
|
||||
}
|
||||
assert_eq!(histo[20], serde_json::Value::Null);
|
||||
}
|
||||
assert_eq!(res["by_term"]["buckets"][3], serde_json::Value::Null);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A `missing` config on a *full* term column still takes the fused path (the string sentinel
|
||||
/// is just `col_max + 1`, so the column stays low-cardinality). Since no doc is missing, the
|
||||
/// real term buckets must be exactly as without `missing`.
|
||||
#[test]
|
||||
fn fused_term_histogram_with_missing_on_full_column() -> crate::Result<()> {
|
||||
let docs: Vec<(f64, String)> = (0..300u64)
|
||||
.map(|i| {
|
||||
(
|
||||
(i % 20) as f64,
|
||||
["a", "b", "c"][(i % 3) as usize].to_string(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let index = get_test_index_from_values_and_terms(true, &[docs])?;
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(serde_json::json!({
|
||||
"by_term": {
|
||||
"terms": { "field": "string_id", "missing": "MISSING", "order": { "_key": "asc" } },
|
||||
"aggs": {
|
||||
"histo": { "histogram": { "field": "score_f64", "interval": 1.0 } }
|
||||
}
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
|
||||
// Column is full, so "MISSING" never applies: a, b, c are unchanged (100 docs, 5 per
|
||||
// bucket).
|
||||
for (term_idx, term) in ["a", "b", "c"].iter().enumerate() {
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["key"], *term);
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["doc_count"], 100);
|
||||
let histo = &res["by_term"]["buckets"][term_idx]["histo"]["buckets"];
|
||||
for b in 0..20usize {
|
||||
assert_eq!(histo[b]["doc_count"], 5, "term {term} bucket {b}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Term cardinality above the general path's `MAX_NUM_TERMS_FOR_VEC` (100) still fuses: the
|
||||
/// flat grid is bounded by the total cell count (`num_terms * num_time_buckets`), not the
|
||||
/// term count.
|
||||
#[test]
|
||||
fn fused_term_histogram_many_terms() -> crate::Result<()> {
|
||||
let num_terms = 150usize;
|
||||
let docs_per_term = 2usize;
|
||||
// All docs share histogram value 0 (a single bucket), so the grid is 150 x 1 = 150 cells.
|
||||
let docs: Vec<(f64, String)> = (0..num_terms * docs_per_term)
|
||||
.map(|i| (0.0, format!("t{:03}", i % num_terms)))
|
||||
.collect();
|
||||
let index = get_test_index_from_values_and_terms(true, &[docs])?;
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(serde_json::json!({
|
||||
"by_term": {
|
||||
"terms": { "field": "string_id", "size": 1000, "order": { "_key": "asc" } },
|
||||
"aggs": {
|
||||
"histo": { "histogram": { "field": "score_f64", "interval": 1.0 } }
|
||||
}
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
|
||||
let buckets = res["by_term"]["buckets"].as_array().unwrap();
|
||||
assert_eq!(buckets.len(), num_terms);
|
||||
for (i, bucket) in buckets.iter().enumerate() {
|
||||
assert_eq!(bucket["key"], format!("t{i:03}"));
|
||||
assert_eq!(bucket["doc_count"], docs_per_term as u64);
|
||||
assert_eq!(bucket["histo"]["buckets"][0]["key"], 0.0);
|
||||
assert_eq!(
|
||||
bucket["histo"]["buckets"][0]["doc_count"],
|
||||
docs_per_term as u64
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `hard_bounds` exercises the non-derived `term_counts` branch: a term's `doc_count` must
|
||||
/// count *every* doc with that term, including docs whose histogram value is outside the
|
||||
/// bounds (those are excluded from the histogram buckets but still counted for the term). This
|
||||
/// is the case where the per-doc `term_counts` increment cannot be replaced by the grid
|
||||
/// row-sum.
|
||||
#[test]
|
||||
fn fused_term_histogram_with_hard_bounds() -> crate::Result<()> {
|
||||
// 300 docs: term = {a, b, c} by i % 3, value = i % 20. Per term: 100 docs, each value in
|
||||
// 0..=19 occurring 5 times.
|
||||
let docs: Vec<(f64, String)> = (0..300u64)
|
||||
.map(|i| {
|
||||
(
|
||||
(i % 20) as f64,
|
||||
["a", "b", "c"][(i % 3) as usize].to_string(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let index = get_test_index_from_values_and_terms(true, &[docs])?;
|
||||
|
||||
// hard_bounds [5, 14] (inclusive) keeps only values 5..=14 in the histogram (10 buckets);
|
||||
// values 0..=4 and 15..=19 are out of bounds.
|
||||
let agg_req: Aggregations = serde_json::from_value(serde_json::json!({
|
||||
"by_term": {
|
||||
"terms": { "field": "string_id", "order": { "_key": "asc" } },
|
||||
"aggs": {
|
||||
"histo": {
|
||||
"histogram": {
|
||||
"field": "score_f64",
|
||||
"interval": 1.0,
|
||||
"hard_bounds": { "min": 5.0, "max": 14.0 }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
|
||||
for (term_idx, term) in ["a", "b", "c"].iter().enumerate() {
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["key"], *term);
|
||||
// doc_count includes the 50 per-term docs whose value is outside [5, 14].
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["doc_count"], 100);
|
||||
let histo = &res["by_term"]["buckets"][term_idx]["histo"]["buckets"];
|
||||
for b in 0..10usize {
|
||||
let key = 5 + b;
|
||||
assert_eq!(histo[b]["key"], key as f64, "term {term} bucket key {key}");
|
||||
assert_eq!(histo[b]["doc_count"], 5, "term {term} bucket {key}");
|
||||
}
|
||||
// Only the 10 in-bounds buckets exist.
|
||||
assert_eq!(histo[10], serde_json::Value::Null);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -138,7 +138,6 @@ impl SubAggBuffer for HighCardSubAggBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn push(&mut self, bucket_id: BucketId, doc_id: DocId) {
|
||||
let idx = bucket_id % NUM_PARTITIONS as u32;
|
||||
let slot = &mut self.partitions[idx as usize];
|
||||
@@ -197,7 +196,6 @@ impl SubAggBuffer for LowCardSubAggBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn push(&mut self, bucket_id: BucketId, doc_id: DocId) {
|
||||
let idx = bucket_id as usize;
|
||||
if self.per_bucket_docs.len() <= idx {
|
||||
|
||||
@@ -171,7 +171,6 @@ impl CouponCache {
|
||||
let uninitialized_coupon = Coupon::from_hash(0);
|
||||
let mut coupon_map: Vec<Coupon> =
|
||||
vec![uninitialized_coupon; highest_term_ord as usize + 1];
|
||||
|
||||
for (term_ord, coupon) in term_ords.into_iter().zip(coupons) {
|
||||
coupon_map[term_ord as usize] = coupon;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user