Compare commits

...

12 Commits

Author SHA1 Message Date
Paul Masurel
d5e2709b1b Test demonstrating that nested aggregation are insanely slow 2026-04-20 19:37:12 +02:00
Paul Masurel
13d74c3c20 Update binggan requirement from 0.16.0 to 0.16.1 (#2899) 2026-04-20 11:59:47 +02:00
dependabot[bot]
058afff8b7 Update binggan requirement from 0.15.3 to 0.16.0
Updates the requirements on [binggan](https://github.com/pseitz/binggan) to permit the latest version.
- [Changelog](https://github.com/PSeitz/binggan/blob/main/CHANGELOG.md)
- [Commits](https://github.com/pseitz/binggan/commits)

---
updated-dependencies:
- dependency-name: binggan
  dependency-version: 0.16.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-04-15 08:58:03 +02:00
Paul Masurel
58aa4b7074 Fix cardinality aggregation using invalid coupons (#2893)
Previously, coupons were computed via murmurhash32 and fed as raw u32
to the HLL sketch, bypassing the sketch's internal hashing and producing
invalid (slot, value) pairs. Switch to Coupon::from_hash from the
datasketches crate which correctly derives coupons, and drop the
now-unused murmurhash32 dependency.

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 19:14:30 +02:00
Paul Masurel
04beab3b29 Performance improvement for nested cardinality aggregation
When a string cardinality aggregation is nested it end up being applied to different buckets.
Dictionary encoding relies on a different dictionaries for each segment.

As a result, during segment collection, we only collect term ordinals in a HashSet, and decode them in the
term dictionary at the end of collection.

Before this PR, this decoding phase was done once for each bucket, causing the same work to be done over and over. This PR introduce a coupon cache. The HLL sketch relies on a hash of the string values.

We populate the cache before bucket collection, and get our values from it.

This PR also rename "caching" "buffering" in aggregation (it was never caching), and does several cleanups.
2026-04-10 14:51:00 +02:00
alexanderbianchi
3cd9011f87 Make BucketEntries::iter, PercentileValuesVecEntry fields, and TopNComputer::threshold public (#2890)
These items need to be accessible from the tantivy-datafusion crate:
- BucketEntries::iter() for iterating aggregation bucket results
- PercentileValuesVecEntry.key/.value for reading percentile results
- TopNComputer.threshold for Block-WAND score pruning in the inverted index provider

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Paul Masurel <paul@quickwit.io>
2026-04-09 13:32:31 +02:00
Paul Masurel
d2c1b8bc2c Optimized intersection count using a bitset when the first leg is dense 2026-04-06 12:01:52 -04:00
nuri
a65107135a Use BinaryHeap for score-based top-K collection (#2881)
* Use BinaryHeap for score-based top-K collection

* Use peek_mut and add proptest for TopNHeap

---------

Co-authored-by: nryoo <nryoo@nryooui-MacBookPro.local>
2026-04-04 19:49:05 +02:00
Pascal Seitz
5c344db1bf chore: Release 2026-03-31 17:15:34 +08:00
Pascal Seitz
dc0f31554d unbump for release and update Changelog.md 2026-03-31 17:15:34 +08:00
trinity-1686a
a28ce3ee54 Merge pull request #2869 from quickwit-oss/trinity.pointard/maint
add dependabot cooldown
2026-03-31 09:52:22 +02:00
trinity Pointard
cf9800f981 add dependabot cooldown 2026-03-30 11:36:04 +02:00
33 changed files with 1187 additions and 275 deletions

View File

@@ -6,6 +6,8 @@ updates:
interval: daily
time: "20:00"
open-pull-requests-limit: 10
cooldown:
default-days: 2
- package-ecosystem: "github-actions"
directory: "/"
@@ -13,3 +15,5 @@ updates:
interval: daily
time: "20:00"
open-pull-requests-limit: 10
cooldown:
default-days: 2

View File

@@ -45,6 +45,7 @@ Tantivy 0.26 (Unreleased)
- Add `seek_danger` on `DocSet` for more efficient intersections [#2538](https://github.com/quickwit-oss/tantivy/pull/2538) [#2810](https://github.com/quickwit-oss/tantivy/pull/2810)(@PSeitz @stuhood @fulmicoton)
- Skip column traversal in `RangeDocSet` when query range does not overlap with column bounds [#2783](https://github.com/quickwit-oss/tantivy/pull/2783)(@ChangRui-Ryan)
- Speed up exclude queries by supporting multiple excluded `DocSet`s without intermediate union [#2825](https://github.com/quickwit-oss/tantivy/pull/2825)(@PSeitz)
- Improve union performance for non-score unions with `fill_buffer` and optimized `TinySet` [#2863](https://github.com/quickwit-oss/tantivy/pull/2863)(@PSeitz)
Tantivy 0.25
================================

View File

@@ -57,15 +57,15 @@ measure_time = "0.9.0"
arc-swap = "1.5.0"
bon = "3.3.1"
columnar = { version = "0.6", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.6", path = "./sstable", package = "tantivy-sstable", optional = true }
stacker = { version = "0.6", path = "./stacker", package = "tantivy-stacker" }
query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
columnar = { version = "0.7", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.7", path = "./sstable", package = "tantivy-sstable", optional = true }
stacker = { version = "0.7", path = "./stacker", package = "tantivy-stacker" }
query-grammar = { version = "0.26.0", path = "./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
datasketches = "0.2.0"
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "7635fb8" }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"
@@ -75,7 +75,7 @@ typetag = "0.2.21"
winapi = "0.3.9"
[dev-dependencies]
binggan = "0.15.3"
binggan = "0.16.1"
rand = "0.9"
maplit = "1.0.2"
matches = "0.1.9"

View File

@@ -78,6 +78,7 @@ fn bench_agg(mut group: InputGroup<Index>) {
register!(group, cardinality_agg);
register!(group, terms_status_with_cardinality_agg);
register!(group, terms_100_buckets_with_cardinality_agg);
register!(group, range_agg);
register!(group, range_agg_with_avg_sub_agg);
@@ -169,6 +170,22 @@ fn terms_status_with_cardinality_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_few_terms_status" },
"aggs": {
"cardinality": {
"cardinality": {
"field": "text_few_terms_status"
},
}
}
},
});
execute_agg(index, agg_req);
}
fn terms_100_buckets_with_cardinality_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_1000_terms_zipf", "size": 100 },
"aggs": {
"cardinality": {
"cardinality": {

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-bitpacker"
version = "0.9.0"
version = "0.10.0"
edition = "2024"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-columnar"
version = "0.6.0"
version = "0.7.0"
edition = "2024"
license = "MIT"
homepage = "https://github.com/quickwit-oss/tantivy"
@@ -12,10 +12,10 @@ categories = ["database-implementations", "data-structures", "compression"]
itertools = "0.14.0"
fastdivide = "0.4.0"
stacker = { version= "0.6", path = "../stacker", package="tantivy-stacker"}
sstable = { version= "0.6", path = "../sstable", package = "tantivy-sstable" }
common = { version= "0.10", path = "../common", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.9", path = "../bitpacker/" }
stacker = { version= "0.7", path = "../stacker", package="tantivy-stacker"}
sstable = { version= "0.7", path = "../sstable", package = "tantivy-sstable" }
common = { version= "0.11", path = "../common", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.10", path = "../bitpacker/" }
serde = "1.0.152"
downcast-rs = "2.0.1"
@@ -23,7 +23,7 @@ downcast-rs = "2.0.1"
proptest = "1"
more-asserts = "0.3.1"
rand = "0.9"
binggan = "0.15.3"
binggan = "0.16.1"
[[bench]]
name = "bench_merge"

View File

@@ -33,14 +33,14 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
&mut self,
docs: &[u32],
accessor: &Column<T>,
missing: Option<T>,
missing_opt: Option<T>,
) {
self.fetch_block(docs, accessor);
// no missing values
if accessor.index.get_cardinality().is_full() {
return;
}
let Some(missing) = missing else {
let Some(missing) = missing_opt else {
return;
};
@@ -191,6 +191,7 @@ where F: FnMut(u32) {
}
#[cfg(test)]
#[allow(clippy::field_reassign_with_default)]
mod tests {
use super::*;

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-common"
version = "0.10.0"
version = "0.11.0"
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
license = "MIT"
edition = "2024"
@@ -19,6 +19,6 @@ time = { version = "0.3.47", features = ["serde-well-known"] }
serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
binggan = "0.15.3"
binggan = "0.16.1"
proptest = "1.0.0"
rand = "0.9"

View File

@@ -47,6 +47,9 @@ impl TinySet {
TinySet(val)
}
/// An empty `TinySet` constant.
pub const EMPTY: TinySet = TinySet(0u64);
/// Returns an empty `TinySet`.
#[inline]
pub fn empty() -> TinySet {

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-query-grammar"
version = "0.25.0"
version = "0.26.0"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]

View File

@@ -208,7 +208,8 @@ pub enum BucketEntries<T> {
}
impl<T> BucketEntries<T> {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = &'a T> + 'a> {
/// Iterate over all bucket entries.
pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = &'a T> + 'a> {
match self {
BucketEntries::Vec(vec) => Box::new(vec.iter()),
BucketEntries::HashMap(map) => Box::new(map.values()),

View File

@@ -21,7 +21,7 @@ use crate::aggregation::bucket::composite::map::{DynArrayHeapMap, MAX_DYN_ARRAY_
use crate::aggregation::bucket::{
CalendarInterval, CompositeAggregationSource, MissingOrder, Order,
};
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardSubAggCache};
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardSubAggBuffer};
use crate::aggregation::intermediate_agg_result::{
CompositeIntermediateKey, IntermediateAggregationResult, IntermediateAggregationResults,
IntermediateBucketResult, IntermediateCompositeBucketEntry, IntermediateCompositeBucketResult,
@@ -119,7 +119,7 @@ pub struct SegmentCompositeCollector {
/// One DynArrayHeapMap per parent bucket.
parent_buckets: Vec<DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>>,
accessor_idx: usize,
sub_agg: Option<CachedSubAggs<HighCardSubAggCache>>,
sub_agg: Option<BufferedSubAggs<HighCardSubAggBuffer>>,
bucket_id_provider: BucketIdProvider,
/// Number of sources, needed when creating new DynArrayHeapMaps.
num_sources: usize,
@@ -218,7 +218,7 @@ impl SegmentCompositeCollector {
let has_sub_aggregations = !node.children.is_empty();
let sub_agg = if has_sub_aggregations {
let sub_agg_collector = build_segment_agg_collectors(req_data, &node.children)?;
Some(CachedSubAggs::new(sub_agg_collector))
Some(BufferedSubAggs::new(sub_agg_collector))
} else {
None
};
@@ -332,7 +332,7 @@ fn collect_bucket_with_limit(
limit_num_buckets: usize,
buckets: &mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
key: &[InternalValueRepr],
sub_agg: &mut Option<CachedSubAggs<HighCardSubAggCache>>,
sub_agg: &mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
bucket_id_provider: &mut BucketIdProvider,
) {
let mut record_in_bucket = |bucket: &mut CompositeBucketCollector| {
@@ -488,7 +488,7 @@ struct CompositeKeyVisitor<'a> {
doc_id: crate::DocId,
composite_agg_data: &'a CompositeAggReqData,
buckets: &'a mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
sub_agg: &'a mut Option<CachedSubAggs<HighCardSubAggCache>>,
sub_agg: &'a mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
bucket_id_provider: &'a mut BucketIdProvider,
sub_level_values: SmallVec<[InternalValueRepr; MAX_DYN_ARRAY_SIZE]>,
}

View File

@@ -511,14 +511,14 @@ mod tests {
fn datetime_from_iso_str(date_str: &str) -> common::DateTime {
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
.expect(&format!("Failed to parse date: {}", date_str));
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
let timestamp_secs = dt.unix_timestamp_nanos();
common::DateTime::from_timestamp_nanos(timestamp_secs as i64)
}
fn ms_timestamp_from_iso_str(date_str: &str) -> i64 {
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
.expect(&format!("Failed to parse date: {}", date_str));
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
(dt.unix_timestamp_nanos() / 1_000_000) as i64
}
@@ -548,7 +548,7 @@ mod tests {
agg_req_json["my_composite"]["composite"]["after"] = after_key.take().unwrap();
}
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
let res = exec_request(agg_req.clone(), &index).unwrap();
let res = exec_request(agg_req.clone(), index).unwrap();
let expected_page_buckets = &expected_buckets_vec[page_idx * page_size
..std::cmp::min((page_idx + 1) * page_size, expected_buckets_vec.len())];
assert_eq!(
@@ -578,7 +578,7 @@ mod tests {
}
});
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
let res = exec_request(agg_req.clone(), &index).unwrap();
let res = exec_request(agg_req.clone(), index).unwrap();
assert_eq!(
res["my_composite"]["buckets"],
json!([]),

View File

@@ -6,8 +6,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::aggregation::agg_data::{
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
};
use crate::aggregation::cached_sub_aggs::{
CachedSubAggs, HighCardSubAggCache, LowCardSubAggCache, SubAggCache,
use crate::aggregation::buffered_sub_aggs::{
BufferedSubAggs, HighCardSubAggBuffer, LowCardSubAggBuffer, SubAggBuffer,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
@@ -503,17 +503,17 @@ struct DocCount {
}
/// Segment collector for filter aggregation
pub struct SegmentFilterCollector<C: SubAggCache> {
pub struct SegmentFilterCollector<B: SubAggBuffer> {
/// Document counts per parent bucket
parent_buckets: Vec<DocCount>,
/// Sub-aggregation collectors
sub_aggregations: Option<CachedSubAggs<C>>,
sub_aggregations: Option<BufferedSubAggs<B>>,
bucket_id_provider: BucketIdProvider,
/// Accessor index for this filter aggregation (to access FilterAggReqData)
accessor_idx: usize,
}
impl<C: SubAggCache> SegmentFilterCollector<C> {
impl<B: SubAggBuffer> SegmentFilterCollector<B> {
/// Create a new filter segment collector following the new agg_data pattern
pub(crate) fn from_req_and_validate(
req: &mut AggregationsSegmentCtx,
@@ -525,7 +525,7 @@ impl<C: SubAggCache> SegmentFilterCollector<C> {
} else {
None
};
let sub_agg_collector = sub_agg_collector.map(CachedSubAggs::new);
let sub_agg_collector = sub_agg_collector.map(BufferedSubAggs::new);
Ok(SegmentFilterCollector {
parent_buckets: Vec::new(),
@@ -547,16 +547,16 @@ pub(crate) fn build_segment_filter_collector(
if is_top_level {
Ok(Box::new(
SegmentFilterCollector::<LowCardSubAggCache>::from_req_and_validate(req, node)?,
SegmentFilterCollector::<LowCardSubAggBuffer>::from_req_and_validate(req, node)?,
))
} else {
Ok(Box::new(
SegmentFilterCollector::<HighCardSubAggCache>::from_req_and_validate(req, node)?,
SegmentFilterCollector::<HighCardSubAggBuffer>::from_req_and_validate(req, node)?,
))
}
}
impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
impl<B: SubAggBuffer> Debug for SegmentFilterCollector<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SegmentFilterCollector")
.field("buckets", &self.parent_buckets)
@@ -566,7 +566,7 @@ impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
}
}
impl<C: SubAggCache> SegmentAggregationCollector for SegmentFilterCollector<C> {
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentFilterCollector<B> {
fn add_intermediate_aggregation_result(
&mut self,
agg_data: &AggregationsSegmentCtx,

View File

@@ -10,7 +10,7 @@ use crate::aggregation::agg_data::{
};
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::agg_result::BucketEntry;
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs};
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateHistogramBucketEntry,
@@ -258,7 +258,7 @@ pub(crate) struct SegmentHistogramBucketEntry {
impl SegmentHistogramBucketEntry {
pub(crate) fn into_intermediate_bucket_entry(
self,
sub_aggregation: &mut Option<HighCardCachedSubAggs>,
sub_aggregation: &mut Option<HighCardBufferedSubAggs>,
agg_data: &AggregationsSegmentCtx,
) -> crate::Result<IntermediateHistogramBucketEntry> {
let mut sub_aggregation_res = IntermediateAggregationResults::default();
@@ -291,7 +291,7 @@ pub struct SegmentHistogramCollector {
/// The buckets containing the aggregation data.
/// One Histogram bucket per parent bucket id.
parent_buckets: Vec<HistogramBuckets>,
sub_agg: Option<HighCardCachedSubAggs>,
sub_agg: Option<HighCardBufferedSubAggs>,
accessor_idx: usize,
bucket_id_provider: BucketIdProvider,
}
@@ -444,7 +444,7 @@ impl SegmentHistogramCollector {
max: f64::MAX,
});
req_data.offset = req_data.req.offset.unwrap_or(0.0);
let sub_agg = sub_agg.map(CachedSubAggs::new);
let sub_agg = sub_agg.map(BufferedSubAggs::new);
Ok(Self {
parent_buckets: Default::default(),

View File

@@ -9,8 +9,9 @@ use crate::aggregation::agg_data::{
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
};
use crate::aggregation::agg_limits::AggregationLimitsGuard;
use crate::aggregation::cached_sub_aggs::{
CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache,
use crate::aggregation::buffered_sub_aggs::{
BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer,
SubAggBuffer,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
@@ -155,13 +156,13 @@ pub(crate) struct SegmentRangeAndBucketEntry {
/// The collector puts values from the fast field into the correct buckets and does a conversion to
/// the correct datatype.
pub struct SegmentRangeCollector<C: SubAggCache> {
pub struct SegmentRangeCollector<B: SubAggBuffer> {
/// The buckets containing the aggregation data.
/// One for each ParentBucketId
parent_buckets: Vec<Vec<SegmentRangeAndBucketEntry>>,
column_type: ColumnType,
pub(crate) accessor_idx: usize,
sub_agg: Option<CachedSubAggs<C>>,
sub_agg: Option<BufferedSubAggs<B>>,
/// Here things get a bit weird. We need to assign unique bucket ids across all
/// parent buckets. So we keep track of the next available bucket id here.
/// This allows a kind of flattening of the bucket ids across all parent buckets.
@@ -178,7 +179,7 @@ pub struct SegmentRangeCollector<C: SubAggCache> {
limits: AggregationLimitsGuard,
}
impl<C: SubAggCache> Debug for SegmentRangeCollector<C> {
impl<B: SubAggBuffer> Debug for SegmentRangeCollector<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SegmentRangeCollector")
.field("parent_buckets_len", &self.parent_buckets.len())
@@ -229,7 +230,7 @@ impl SegmentRangeBucketEntry {
}
}
impl<C: SubAggCache> SegmentAggregationCollector for SegmentRangeCollector<C> {
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
fn add_intermediate_aggregation_result(
&mut self,
agg_data: &AggregationsSegmentCtx,
@@ -350,8 +351,8 @@ pub(crate) fn build_segment_range_collector(
};
if is_low_card {
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggCache> {
sub_agg: sub_agg.map(LowCardCachedSubAggs::new),
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggBuffer> {
sub_agg: sub_agg.map(LowCardBufferedSubAggs::new),
column_type: field_type,
accessor_idx,
parent_buckets: Vec::new(),
@@ -359,8 +360,8 @@ pub(crate) fn build_segment_range_collector(
limits: agg_data.context.limits.clone(),
}))
} else {
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggCache> {
sub_agg: sub_agg.map(CachedSubAggs::new),
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggBuffer> {
sub_agg: sub_agg.map(BufferedSubAggs::new),
column_type: field_type,
accessor_idx,
parent_buckets: Vec::new(),
@@ -370,7 +371,7 @@ pub(crate) fn build_segment_range_collector(
}
}
impl<C: SubAggCache> SegmentRangeCollector<C> {
impl<B: SubAggBuffer> SegmentRangeCollector<B> {
pub(crate) fn create_new_buckets(
&mut self,
agg_data: &AggregationsSegmentCtx,
@@ -554,7 +555,7 @@ mod tests {
pub fn get_collector_from_ranges(
ranges: Vec<RangeAggregationRange>,
field_type: ColumnType,
) -> SegmentRangeCollector<HighCardSubAggCache> {
) -> SegmentRangeCollector<HighCardSubAggBuffer> {
let req = RangeAggregation {
field: "dummy".to_string(),
ranges,

View File

@@ -1,5 +1,4 @@
use std::fmt::Debug;
use std::io;
use std::net::Ipv6Addr;
use columnar::column_values::CompactSpaceU64Accessor;
@@ -17,8 +16,9 @@ use crate::aggregation::agg_data::{
};
use crate::aggregation::agg_limits::MemoryConsumption;
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::cached_sub_aggs::{
CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache,
use crate::aggregation::buffered_sub_aggs::{
BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer,
SubAggBuffer,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
@@ -391,7 +391,7 @@ pub(crate) fn build_segment_term_collector(
// Decide which bucket storage is best suited for this aggregation.
if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations {
let term_buckets = VecTermBucketsNoAgg::new(max_term_id + 1, &mut bucket_id_provider);
let collector: SegmentTermCollector<_, HighCardSubAggCache> = SegmentTermCollector {
let collector: SegmentTermCollector<_, HighCardSubAggBuffer> = SegmentTermCollector {
parent_buckets: vec![term_buckets],
sub_agg: None,
bucket_id_provider,
@@ -401,8 +401,8 @@ pub(crate) fn build_segment_term_collector(
Ok(Box::new(collector))
} else if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC {
let term_buckets = VecTermBuckets::new(max_term_id + 1, &mut bucket_id_provider);
let sub_agg = sub_agg_collector.map(LowCardCachedSubAggs::new);
let collector: SegmentTermCollector<_, LowCardSubAggCache> = SegmentTermCollector {
let sub_agg = sub_agg_collector.map(LowCardBufferedSubAggs::new);
let collector: SegmentTermCollector<_, LowCardSubAggBuffer> = SegmentTermCollector {
parent_buckets: vec![term_buckets],
sub_agg,
bucket_id_provider,
@@ -414,8 +414,8 @@ pub(crate) fn build_segment_term_collector(
let term_buckets: PagedTermMap =
PagedTermMap::new(max_term_id + 1, &mut bucket_id_provider);
// Build sub-aggregation blueprint (flat pairs)
let sub_agg = sub_agg_collector.map(CachedSubAggs::new);
let collector: SegmentTermCollector<PagedTermMap, HighCardSubAggCache> =
let sub_agg = sub_agg_collector.map(BufferedSubAggs::new);
let collector: SegmentTermCollector<PagedTermMap, HighCardSubAggBuffer> =
SegmentTermCollector {
parent_buckets: vec![term_buckets],
sub_agg,
@@ -427,8 +427,8 @@ pub(crate) fn build_segment_term_collector(
} else {
let term_buckets: HashMapTermBuckets = HashMapTermBuckets::default();
// Build sub-aggregation blueprint (flat pairs)
let sub_agg = sub_agg_collector.map(CachedSubAggs::new);
let collector: SegmentTermCollector<HashMapTermBuckets, HighCardSubAggCache> =
let sub_agg = sub_agg_collector.map(BufferedSubAggs::new);
let collector: SegmentTermCollector<HashMapTermBuckets, HighCardSubAggBuffer> =
SegmentTermCollector {
parent_buckets: vec![term_buckets],
sub_agg,
@@ -758,10 +758,10 @@ impl TermAggregationMap for VecTermBuckets {
/// The collector puts values from the fast field into the correct buckets and does a conversion to
/// the correct datatype.
#[derive(Debug)]
struct SegmentTermCollector<TermMap: TermAggregationMap, C: SubAggCache> {
struct SegmentTermCollector<TermMap: TermAggregationMap, B: SubAggBuffer> {
/// The buckets containing the aggregation data.
parent_buckets: Vec<TermMap>,
sub_agg: Option<CachedSubAggs<C>>,
sub_agg: Option<BufferedSubAggs<B>>,
bucket_id_provider: BucketIdProvider,
max_term_id: u64,
terms_req_data: TermsAggReqData,
@@ -772,8 +772,8 @@ pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
(agg_name, agg_property)
}
impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
for SegmentTermCollector<TermMap, C>
impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentAggregationCollector
for SegmentTermCollector<TermMap, B>
{
fn add_intermediate_aggregation_result(
&mut self,
@@ -790,8 +790,14 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
let term_req = &self.terms_req_data;
let name = term_req.name.clone();
let bucket =
Self::into_intermediate_bucket_result(term_req, &mut self.sub_agg, bucket, agg_data)?;
let bucket = Self::into_intermediate_bucket_result(
term_req,
self.sub_agg
.as_mut()
.map(BufferedSubAggs::get_sub_agg_collector),
bucket,
agg_data,
)?;
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
Ok(())
}
@@ -803,7 +809,7 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
docs: &[crate::DocId],
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let mem_pre = self.get_memory_consumption();
// let mem_pre = self.get_memory_consumption();
let req_data = &mut self.terms_req_data;
@@ -847,13 +853,16 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
}
}
let mem_delta = self.get_memory_consumption() - mem_pre;
if mem_delta > 0 {
agg_data
.context
.limits
.add_memory_consumed(mem_delta as u64)?;
}
// let mem_delta = self.get_memory_consumption() - mem_pre;
// if mem_delta > 0 {
// agg_data
// .context
// .limits
// .add_memory_consumed(mem_delta as u64)?;
// }
// After commenting out -> 6000ms -> 36ms
if let Some(sub_agg) = &mut self.sub_agg {
sub_agg.check_flush_local(agg_data)?;
}
@@ -907,10 +916,38 @@ fn extract_missing_value<T>(
Some((key, bucket))
}
impl<TermMap, C> SegmentTermCollector<TermMap, C>
fn reborrow_opt_collector<'a>(
opt: &'a mut Option<&mut dyn SegmentAggregationCollector>,
) -> Option<&'a mut dyn SegmentAggregationCollector> {
match opt {
Some(inner) => Some(*inner),
None => None,
}
}
fn into_intermediate_bucket_entry(
bucket: Bucket,
sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>,
agg_data: &AggregationsSegmentCtx,
) -> crate::Result<IntermediateTermBucketEntry> {
let mut sub_aggregation_res = IntermediateAggregationResults::default();
if let Some(sub_agg_collector) = sub_agg_collector {
sub_agg_collector.add_intermediate_aggregation_result(
agg_data,
&mut sub_aggregation_res,
bucket.bucket_id,
)?;
}
Ok(IntermediateTermBucketEntry {
doc_count: bucket.count,
sub_aggregation: sub_aggregation_res,
})
}
impl<TermMap, B> SegmentTermCollector<TermMap, B>
where
TermMap: TermAggregationMap,
C: SubAggCache,
B: SubAggBuffer,
{
fn get_memory_consumption(&self) -> usize {
self.parent_buckets
@@ -922,7 +959,7 @@ where
#[inline]
pub(crate) fn into_intermediate_bucket_result(
term_req: &TermsAggReqData,
sub_agg: &mut Option<CachedSubAggs<C>>,
mut sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>,
term_buckets: TermMap,
agg_data: &AggregationsSegmentCtx,
) -> crate::Result<IntermediateBucketResult> {
@@ -965,31 +1002,6 @@ where
let mut dict: FxHashMap<IntermediateKey, IntermediateTermBucketEntry> = Default::default();
dict.reserve(entries.len());
let into_intermediate_bucket_entry =
|bucket: Bucket,
sub_agg: &mut Option<CachedSubAggs<C>>|
-> crate::Result<IntermediateTermBucketEntry> {
if let Some(sub_agg) = sub_agg {
let mut sub_aggregation_res = IntermediateAggregationResults::default();
sub_agg
.get_sub_agg_collector()
.add_intermediate_aggregation_result(
agg_data,
&mut sub_aggregation_res,
bucket.bucket_id,
)?;
Ok(IntermediateTermBucketEntry {
doc_count: bucket.count,
sub_aggregation: sub_aggregation_res,
})
} else {
Ok(IntermediateTermBucketEntry {
doc_count: bucket.count,
sub_aggregation: Default::default(),
})
}
};
if term_req.column_type == ColumnType::Str {
let fallback_dict = Dictionary::empty();
let term_dict = term_req
@@ -1000,7 +1012,11 @@ where
if let Some((intermediate_key, bucket)) = extract_missing_value(&mut entries, term_req)
{
let intermediate_entry = into_intermediate_bucket_entry(bucket, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
bucket,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
dict.insert(intermediate_key, intermediate_entry);
}
@@ -1008,19 +1024,28 @@ where
entries.sort_unstable_by_key(|bucket| bucket.0);
let (term_ids, buckets): (Vec<u64>, Vec<Bucket>) = entries.into_iter().unzip();
let mut buckets_it = buckets.into_iter();
term_dict.sorted_ords_to_term_cb(term_ids.into_iter(), |term| {
let bucket = buckets_it.next().unwrap();
let intermediate_entry =
into_intermediate_bucket_entry(bucket, sub_agg).map_err(io::Error::other)?;
let intermediate_entries: Vec<IntermediateTermBucketEntry> = buckets
.into_iter()
.map(|bucket| {
into_intermediate_bucket_entry(
bucket,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)
})
.collect::<crate::Result<_>>()?;
let mut intermediate_entry_it = intermediate_entries.into_iter();
term_dict.sorted_ords_to_term_cb(&term_ids[..], |term| {
let intermediate_entry = intermediate_entry_it.next().unwrap();
dict.insert(
IntermediateKey::Str(
String::from_utf8(term.to_vec()).expect("could not convert to String"),
),
intermediate_entry,
);
Ok(())
})?;
if term_req.req.min_doc_count == 0 {
@@ -1055,14 +1080,22 @@ where
}
} else if term_req.column_type == ColumnType::DateTime {
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
doc_count,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
let val = i64::from_u64(val);
let date = format_date(val)?;
dict.insert(IntermediateKey::Str(date), intermediate_entry);
}
} else if term_req.column_type == ColumnType::Bool {
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
doc_count,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
let val = bool::from_u64(val);
dict.insert(IntermediateKey::Bool(val), intermediate_entry);
}
@@ -1082,14 +1115,22 @@ where
})?;
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
doc_count,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
let val = Ipv6Addr::from_u128(val);
dict.insert(IntermediateKey::IpAddr(val), intermediate_entry);
}
} else {
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
doc_count,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
if term_req.column_type == ColumnType::U64 {
dict.insert(IntermediateKey::U64(val), intermediate_entry);
} else if term_req.column_type == ColumnType::I64 {
@@ -1123,13 +1164,13 @@ where
}
}
impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentTermCollector<TermMap, C> {
impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentTermCollector<TermMap, B> {
#[inline]
fn collect_terms_with_docs(
iter: impl Iterator<Item = (crate::DocId, u64)>,
term_buckets: &mut TermMap,
bucket_id_provider: &mut BucketIdProvider,
sub_agg: &mut CachedSubAggs<C>,
sub_agg: &mut BufferedSubAggs<B>,
) {
for (doc, term_id) in iter {
let bucket_id = term_buckets.term_entry(term_id, bucket_id_provider);
@@ -1187,6 +1228,7 @@ pub(crate) fn cut_off_buckets<T: GetDocCount + Debug>(
mod tests {
use std::net::IpAddr;
use std::str::FromStr;
use std::time::Instant;
use common::DateTime;
use time::{Date, Month};
@@ -1200,8 +1242,9 @@ mod tests {
get_test_index_from_terms, get_test_index_from_values_and_terms,
};
use crate::aggregation::{AggregationLimitsGuard, DistributedAggregationCollector};
use crate::collector::{Collector, default_collect_segment_impl};
use crate::indexer::NoMergePolicy;
use crate::query::AllQuery;
use crate::query::{AllQuery, EnableScoring, Query};
use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
use crate::{Index, IndexWriter};
@@ -2896,4 +2939,103 @@ mod tests {
Ok(())
}
#[test]
fn test_terms_double_nesting() {
let mut schema_builder = Schema::builder();
let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST);
let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let outer_values = (0..10_000)
.map(|i| format!("outer_{i}"))
.collect::<Vec<_>>();
let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"];
{
let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 200_000_000).unwrap();
for doc_id in 0..1_000_000u64 {
let outer_val = &outer_values[doc_id as usize % outer_values.len()];
let inner_val = inner_values[doc_id as usize % inner_values.len()];
index_writer.add_document(doc!(
outer_field => outer_val.as_str(),
inner_field => inner_val,
)).unwrap();
}
index_writer.commit().unwrap();
}
let agg_req: Aggregations = serde_json::from_value(json!({
"outer": {
"terms": { "field": "outer_term", "size": 10 },
"aggs": {
"inner": {
"terms": { "field": "inner_term" }
}
}
}
}))
.unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let collector =
crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default());
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0u32);
let all_weight = AllQuery.weight(EnableScoring::disabled_from_schema(&schema)).unwrap();
let mut segment_collector = collector.for_segment(0u32, segment_reader).unwrap();
let start = Instant::now();
default_collect_segment_impl(&mut segment_collector, &*all_weight, segment_reader, false).unwrap();
dbg!(start.elapsed());
}
#[test]
fn test_terms_simple_nesting() {
let mut schema_builder = Schema::builder();
let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST);
let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let outer_values = (0..10_000)
.map(|i| format!("outer_{i}"))
.collect::<Vec<_>>();
let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"];
{
let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 200_000_000).unwrap();
for doc_id in 0..1_000_000u64 {
let outer_val = &outer_values[doc_id as usize % outer_values.len()];
let inner_val = inner_values[doc_id as usize % inner_values.len()];
index_writer.add_document(doc!(
outer_field => outer_val.as_str(),
inner_field => inner_val,
)).unwrap();
}
index_writer.commit().unwrap();
}
let agg_req: Aggregations = serde_json::from_value(json!({
"outer": {
"terms": { "field": "outer_term", "size": 10 },
}
}))
.unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let collector =
crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default());
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0u32);
let all_weight = AllQuery.weight(EnableScoring::disabled_from_schema(&schema)).unwrap();
let mut segment_collector = collector.for_segment(0u32, segment_reader).unwrap();
let start = Instant::now();
default_collect_segment_impl(&mut segment_collector, &*all_weight, segment_reader, false).unwrap();
dbg!(start.elapsed());
}
}

View File

@@ -5,7 +5,7 @@ use crate::aggregation::agg_data::{
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
};
use crate::aggregation::bucket::term_agg::TermsAggregation;
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs};
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateKey, IntermediateTermBucketEntry, IntermediateTermBucketResult,
@@ -47,7 +47,7 @@ struct MissingCount {
#[derive(Default, Debug)]
pub struct TermMissingAgg {
accessor_idx: usize,
sub_agg: Option<HighCardCachedSubAggs>,
sub_agg: Option<HighCardBufferedSubAggs>,
/// Idx = parent bucket id, Value = missing count for that bucket
missing_count_per_bucket: Vec<MissingCount>,
bucket_id_provider: BucketIdProvider,
@@ -66,7 +66,7 @@ impl TermMissingAgg {
None
};
let sub_agg = sub_agg.map(CachedSubAggs::new);
let sub_agg = sub_agg.map(BufferedSubAggs::new);
let bucket_id_provider = BucketIdProvider::default();
Ok(Self {

View File

@@ -6,7 +6,7 @@ use crate::aggregation::bucket::MAX_NUM_TERMS_FOR_VEC;
use crate::aggregation::BucketId;
use crate::DocId;
/// A cache for sub-aggregations, storing doc ids per bucket id.
/// A buffer for sub-aggregations, storing doc ids per bucket id.
/// Depending on the cardinality of the parent aggregation, we use different
/// storage strategies.
///
@@ -24,21 +24,21 @@ use crate::DocId;
/// aggregations.
/// What this datastructure does in general is to group docs by bucket id.
#[derive(Debug)]
pub(crate) struct CachedSubAggs<C: SubAggCache> {
cache: C,
pub(crate) struct BufferedSubAggs<B: SubAggBuffer> {
buffer: B,
sub_agg_collector: Box<dyn SegmentAggregationCollector>,
num_docs: usize,
}
pub type LowCardCachedSubAggs = CachedSubAggs<LowCardSubAggCache>;
pub type HighCardCachedSubAggs = CachedSubAggs<HighCardSubAggCache>;
pub type LowCardBufferedSubAggs = BufferedSubAggs<LowCardSubAggBuffer>;
pub type HighCardBufferedSubAggs = BufferedSubAggs<HighCardSubAggBuffer>;
const FLUSH_THRESHOLD: usize = 2048;
/// A trait for caching sub-aggregation doc ids per bucket id.
/// A trait for buffering sub-aggregation doc ids per bucket id.
/// Different implementations can be used depending on the cardinality
/// of the parent aggregation.
pub trait SubAggCache: Debug {
pub trait SubAggBuffer: Debug {
fn new() -> Self;
fn push(&mut self, bucket_id: BucketId, doc_id: DocId);
fn flush_local(
@@ -49,22 +49,22 @@ pub trait SubAggCache: Debug {
) -> crate::Result<()>;
}
impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
impl<Backend: SubAggBuffer + Debug> BufferedSubAggs<Backend> {
pub fn new(sub_agg: Box<dyn SegmentAggregationCollector>) -> Self {
Self {
cache: Backend::new(),
buffer: Backend::new(),
sub_agg_collector: sub_agg,
num_docs: 0,
}
}
pub fn get_sub_agg_collector(&mut self) -> &mut Box<dyn SegmentAggregationCollector> {
&mut self.sub_agg_collector
pub fn get_sub_agg_collector(&mut self) -> &mut dyn SegmentAggregationCollector {
&mut *self.sub_agg_collector
}
#[inline]
pub fn push(&mut self, bucket_id: BucketId, doc_id: DocId) {
self.cache.push(bucket_id, doc_id);
self.buffer.push(bucket_id, doc_id);
self.num_docs += 1;
}
@@ -75,7 +75,7 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
if self.num_docs >= FLUSH_THRESHOLD {
self.cache
self.buffer
.flush_local(&mut self.sub_agg_collector, agg_data, false)?;
self.num_docs = 0;
}
@@ -85,7 +85,7 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
/// Note: this _does_ flush the sub aggregations.
pub fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
if self.num_docs != 0 {
self.cache
self.buffer
.flush_local(&mut self.sub_agg_collector, agg_data, true)?;
self.num_docs = 0;
}
@@ -94,11 +94,11 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
}
}
/// Number of partitions for high cardinality sub-aggregation cache.
/// Number of partitions for high cardinality sub-aggregation buffer.
const NUM_PARTITIONS: usize = 16;
#[derive(Debug)]
pub(crate) struct HighCardSubAggCache {
pub(crate) struct HighCardSubAggBuffer {
/// This weird partitioning is used to do some cheap grouping on the bucket ids.
/// bucket ids are dense, e.g. when we don't detect the cardinality as low cardinality,
/// but there are just 16 bucket ids, each bucket id will go to its own partition.
@@ -108,7 +108,7 @@ pub(crate) struct HighCardSubAggCache {
partitions: Box<[PartitionEntry; NUM_PARTITIONS]>,
}
impl HighCardSubAggCache {
impl HighCardSubAggBuffer {
#[inline]
fn clear(&mut self) {
for partition in self.partitions.iter_mut() {
@@ -131,7 +131,7 @@ impl PartitionEntry {
}
}
impl SubAggCache for HighCardSubAggCache {
impl SubAggBuffer for HighCardSubAggBuffer {
fn new() -> Self {
Self {
partitions: Box::new(core::array::from_fn(|_| PartitionEntry::default())),
@@ -173,14 +173,14 @@ impl SubAggCache for HighCardSubAggCache {
}
#[derive(Debug)]
pub(crate) struct LowCardSubAggCache {
/// Cache doc ids per bucket for sub-aggregations.
pub(crate) struct LowCardSubAggBuffer {
/// Buffer doc ids per bucket for sub-aggregations.
///
/// The outer Vec is indexed by BucketId.
per_bucket_docs: Vec<Vec<DocId>>,
}
impl LowCardSubAggCache {
impl LowCardSubAggBuffer {
#[inline]
fn clear(&mut self) {
for v in &mut self.per_bucket_docs {
@@ -189,7 +189,7 @@ impl LowCardSubAggCache {
}
}
impl SubAggCache for LowCardSubAggCache {
impl SubAggBuffer for LowCardSubAggBuffer {
fn new() -> Self {
Self {
per_bucket_docs: Vec::new(),

View File

@@ -1,6 +1,6 @@
use super::agg_req::Aggregations;
use super::agg_result::AggregationResults;
use super::cached_sub_aggs::LowCardCachedSubAggs;
use super::buffered_sub_aggs::LowCardBufferedSubAggs;
use super::intermediate_agg_result::IntermediateAggregationResults;
use super::AggContextParams;
// group buffering strategy is chosen explicitly by callers; no need to hash-group on the fly.
@@ -136,7 +136,7 @@ fn merge_fruits(
/// `AggregationSegmentCollector` does the aggregation collection on a segment.
pub struct AggregationSegmentCollector {
aggs_with_accessor: AggregationsSegmentCtx,
agg_collector: LowCardCachedSubAggs,
agg_collector: LowCardBufferedSubAggs,
error: Option<TantivyError>,
}
@@ -152,7 +152,7 @@ impl AggregationSegmentCollector {
let mut agg_data =
build_aggregations_data_from_req(agg, reader, segment_ordinal, context.clone())?;
let mut result =
LowCardCachedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?);
LowCardBufferedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?);
result
.get_sub_agg_collector()
.prepare_max_bucket(0, &agg_data)?; // prepare for bucket zero

View File

@@ -1,10 +1,11 @@
use std::fmt::Debug;
use std::hash::Hash;
use std::io;
use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{Column, ColumnType, Dictionary, StrColumn};
use common::f64_to_u64;
use datasketches::hll::{HllSketch, HllType, HllUnion};
use rustc_hash::FxHashSet;
use datasketches::hll::{Coupon, HllSketch, HllType, HllUnion};
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::aggregation::agg_data::AggregationsSegmentCtx;
@@ -120,9 +121,65 @@ impl CardinalityAggregationReq {
}
}
#[derive(Clone, Debug)]
/// A CouponCache is here to cache the mapping term ordinal -> coupon (see above).
/// The idea is that we do not want to fetch terms associated to several term ordinals,
/// several times due to the fact that we have several buckets.
enum CouponCache {
Dense {
coupon_map: Vec<Coupon>,
missing_coupon_opt: Option<Coupon>,
},
Sparse {
coupon_map: FxHashMap<u64, Coupon>,
missing_coupon_opt: Option<Coupon>,
},
}
impl CouponCache {
fn new(
term_ords: Vec<u64>,
coupons: Vec<Coupon>,
missing_coupon_opt: Option<Coupon>,
) -> CouponCache {
let num_terms = term_ords.len();
assert_eq!(num_terms, coupons.len());
if term_ords.is_empty() {
return CouponCache::Dense {
coupon_map: Vec::new(),
missing_coupon_opt,
};
}
let highest_term_ord = term_ords.last().copied().unwrap_or(0u64);
// We prefer the dense implementation, if it is not too wasteful.
// There are two cases for which we can use it.
// 1- if the data is small.
// 2- if the data is not necessarily small, but due to a high occupancy ratio, the RAM usage
// is not that much bigger than if we had used a HashSet. (occupancy ratio + extra
// metadata ~ x2.25)
let should_use_dense =
highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64;
if should_use_dense {
let mut coupon_map: Vec<Coupon> = vec![Coupon::EMPTY; highest_term_ord as usize + 1];
for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) {
coupon_map[term_ord as usize] = coupon;
}
CouponCache::Dense {
coupon_map,
missing_coupon_opt,
}
} else {
let coupon_map: FxHashMap<u64, Coupon> = term_ords.into_iter().zip(coupons).collect();
CouponCache::Sparse {
coupon_map,
missing_coupon_opt,
}
}
}
}
pub(crate) struct SegmentCardinalityCollector {
buckets: Vec<SegmentCardinalityCollectorBucket>,
/// Buckets are Some(_) until they get consumed by into_intermediate_results().
buckets: Vec<Option<SegmentCardinalityCollectorBucket>>,
accessor_idx: usize,
/// The column accessor to access the fast field values.
accessor: Column<u64>,
@@ -130,75 +187,133 @@ pub(crate) struct SegmentCardinalityCollector {
column_type: ColumnType,
/// The missing value normalized to the internal u64 representation of the field type.
missing_value_for_accessor: Option<u64>,
coupon_cache: Option<CouponCache>,
}
impl Debug for SegmentCardinalityCollector {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("SegmentCardinalityCollector")
.field("column_type", &self.column_type)
.field(
"missing_value_for_accessor",
&self.missing_value_for_accessor,
)
.finish()
}
}
#[derive(Clone, Debug, PartialEq, Default)]
pub(crate) struct SegmentCardinalityCollectorBucket {
cardinality: CardinalityCollector,
entries: FxHashSet<u64>,
}
impl SegmentCardinalityCollectorBucket {
#[inline(always)]
pub fn new(column_type: ColumnType) -> Self {
Self {
cardinality: CardinalityCollector::new(column_type as u8),
entries: FxHashSet::default(),
}
}
// Returns a intermediate metric result.
//
// If the column is not str, the values have been added to the
// sketch during collection.
//
// If the column is str, then the values are dictionary encoded
// and have not been added to the sketch yet.
// We need to resolves the term ords accumulated in self.entries
// with the coupon cache, and append the results to the sketch.
fn into_intermediate_metric_result(
mut self,
req_data: &CardinalityAggReqData,
coupon_cache_opt: Option<&CouponCache>,
) -> crate::Result<IntermediateMetricResult> {
if req_data.column_type == ColumnType::Str {
let fallback_dict = Dictionary::empty();
let dict = req_data
.str_dict_column
.as_ref()
.map(|el| el.dictionary())
.unwrap_or_else(|| &fallback_dict);
let mut has_missing = false;
if let Some(coupon_cache) = coupon_cache_opt {
assert!(self.cardinality.sketch.is_empty());
append_to_sketch(&self.entries, coupon_cache, &mut self.cardinality);
}
Ok(IntermediateMetricResult::Cardinality(self.cardinality))
}
}
// TODO: replace FxHashSet with something that allows iterating in order
// (e.g. sparse bitvec)
let mut term_ids = Vec::new();
for term_ord in self.entries.into_iter() {
if term_ord == u64::MAX {
has_missing = true;
} else {
// we can reasonably exclude values above u32::MAX
term_ids.push(term_ord as u32);
}
}
/// Builds a coupon cache from the given buckets, dictionary, and optional missing value.
/// Returns a mapping from term_ord to the hash (coupon) of the associated term.
fn build_coupon_cache(
buckets: &[Option<SegmentCardinalityCollectorBucket>],
dictionary: &Dictionary,
missing_value_opt: Option<&Key>,
) -> io::Result<CouponCache> {
let term_ords_capacity: usize = buckets
.iter()
.flatten()
.map(|bucket| bucket.entries.len())
.max()
.unwrap_or(0)
* 2;
let mut term_ords_set = FxHashSet::with_capacity_and_hasher(term_ords_capacity, FxBuildHasher);
for bucket in buckets.iter().flatten() {
term_ords_set.extend(bucket.entries.iter().copied());
}
let mut term_ords: Vec<u64> = term_ords_set.into_iter().collect();
term_ords.sort_unstable();
term_ids.sort_unstable();
dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| {
self.cardinality.insert(term);
Ok(())
})?;
if has_missing {
// Replace missing with the actual value provided
let missing_key =
req_data.req.missing.as_ref().expect(
"Found sentinel value u64::MAX for term_ord but `missing` is not set",
);
match missing_key {
Key::Str(missing) => {
self.cardinality.insert(missing.as_str());
}
Key::F64(val) => {
let val = f64_to_u64(*val);
self.cardinality.insert(val);
}
Key::U64(val) => {
self.cardinality.insert(*val);
}
Key::I64(val) => {
self.cardinality.insert(*val);
}
term_ords.pop_if(|highest_term_ord| *highest_term_ord >= dictionary.num_terms() as u64);
let mut coupons: Vec<Coupon> = Vec::with_capacity(term_ords.len());
let all_term_ords_found: bool =
dictionary.sorted_ords_to_term_cb(&term_ords, |term_bytes| {
let coupon: Coupon = Coupon::from_hash(term_bytes);
coupons.push(coupon);
})?;
assert!(all_term_ords_found);
// Regardless of whether or not there is effectively a missing value in one of the buckets,
// we populate the cache with the missing key too (if any).
let missing_coupon_opt: Option<Coupon> = missing_value_opt.map(|missing_key| {
if let Key::Str(missing_value_str) = missing_key {
Coupon::from_hash(missing_value_str.as_bytes())
} else {
// See https://github.com/quickwit-oss/tantivy/issues/2891
// A missing key with a type different from Str will not work as intended
// for the moment.
//
// Right now this is just a partial workaround.
Coupon::from_hash("__tantivy_missing_non_str__".as_bytes())
}
});
Ok(CouponCache::new(term_ords, coupons, missing_coupon_opt))
}
fn append_to_sketch(
term_ords: &FxHashSet<u64>,
coupon_cache: &CouponCache,
sketch: &mut CardinalityCollector,
) {
match coupon_cache {
CouponCache::Dense {
coupon_map,
missing_coupon_opt,
} => {
for &term_ord in term_ords {
if let Some(coupon) = coupon_map
.get(term_ord as usize)
.copied()
.or(*missing_coupon_opt)
{
sketch.insert_coupon(coupon);
}
}
}
CouponCache::Sparse {
coupon_map,
missing_coupon_opt,
} => {
for term_ord in term_ords {
if let Some(coupon) = coupon_map.get(term_ord).copied().or(*missing_coupon_opt) {
sketch.insert_coupon(coupon);
}
}
}
Ok(IntermediateMetricResult::Cardinality(self.cardinality))
}
}
@@ -210,11 +325,12 @@ impl SegmentCardinalityCollector {
missing_value_for_accessor: Option<u64>,
) -> Self {
Self {
buckets: vec![SegmentCardinalityCollectorBucket::new(column_type); 1],
buckets: Vec::new(),
column_type,
accessor_idx,
accessor,
missing_value_for_accessor,
coupon_cache: None,
}
}
@@ -236,15 +352,35 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
&mut self,
agg_data: &AggregationsSegmentCtx,
results: &mut IntermediateAggregationResults,
parent_bucket_id: BucketId,
bucket_id: BucketId,
) -> crate::Result<()> {
self.prepare_max_bucket(parent_bucket_id, agg_data)?;
self.prepare_max_bucket(bucket_id, agg_data)?;
let req_data = &agg_data.get_cardinality_req_data(self.accessor_idx);
// Strings are dictionary encoded. Fetching the terms associated to strings
// is expensive. For this reason, we do that once for all buckets and cache the results
// here.
if let Some(str_dict_column) = &req_data.str_dict_column {
// Ensure the coupon cache is populated.
// A mapping from term_ord to the hash of the associated term.
// The missing value sentinel will be associated to the hash of the missing value if
// any.
if self.coupon_cache.is_none() {
self.coupon_cache = Some(build_coupon_cache(
&self.buckets,
str_dict_column.dictionary(),
req_data.req.missing.as_ref(),
)?);
}
}
let name = req_data.name.to_string();
// take the bucket in buckets and replace it with a new empty one
let bucket = std::mem::take(&mut self.buckets[parent_bucket_id as usize]);
let intermediate_result = bucket.into_intermediate_metric_result(req_data)?;
let Some(bucket) = self.buckets[bucket_id as usize].take() else {
return Err(crate::TantivyError::InternalError(
"the same bucket should not be finalized twice.".to_string(),
));
};
let intermediate_result =
bucket.into_intermediate_metric_result(self.coupon_cache.as_ref())?;
results.push(
name,
IntermediateAggregationResult::Metric(intermediate_result),
@@ -260,8 +396,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
self.fetch_block_with_field(docs, agg_data);
let bucket = &mut self.buckets[parent_bucket_id as usize];
let Some(bucket) = &mut self.buckets[parent_bucket_id as usize].as_mut() else {
return Err(crate::TantivyError::InternalError(
"collection should not happen after finalization".to_string(),
));
};
let col_block_accessor = &agg_data.column_block_accessor;
if self.column_type == ColumnType::Str {
for term_ord in col_block_accessor.iter_vals() {
@@ -301,7 +440,7 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
) -> crate::Result<()> {
if max_bucket as usize >= self.buckets.len() {
self.buckets.resize_with(max_bucket as usize + 1, || {
SegmentCardinalityCollectorBucket::new(self.column_type)
Some(SegmentCardinalityCollectorBucket::new(self.column_type))
});
}
Ok(())
@@ -358,10 +497,14 @@ impl CardinalityCollector {
/// Insert a value into the HLL sketch, salted by the column type.
/// The salt ensures that identical u64 values from different column types
/// (e.g. bool `false` vs i64 `0`) are counted as distinct.
pub(crate) fn insert<T: Hash>(&mut self, value: T) {
fn insert<T: Hash>(&mut self, value: T) {
self.sketch.update((self.salt, value));
}
fn insert_coupon(&mut self, coupon: Coupon) {
self.sketch.update_with_coupon(coupon);
}
/// Compute the final cardinality estimate.
pub fn finalize(self) -> Option<f64> {
Some(self.sketch.estimate().trunc())
@@ -377,7 +520,7 @@ impl CardinalityCollector {
let mut union = HllUnion::new(LG_K);
union.update(&self.sketch);
union.update(&right.sketch);
self.sketch = union.get_result(HllType::Hll4);
self.sketch = union.to_sketch(HllType::Hll4);
Ok(())
}
}
@@ -392,7 +535,7 @@ mod tests {
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::tests::{exec_request, get_test_index_from_terms};
use crate::schema::{IntoIpv6Addr, Schema, FAST};
use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
use crate::Index;
#[test]
@@ -575,6 +718,30 @@ mod tests {
assert_eq!(estimate, 3.0);
}
/// Verifies that merging two small sketches (both in List/Set coupon mode)
/// produces an exact result — i.e. the HllUnion does not unnecessarily
/// promote to the full HLL array when the combined cardinality is small.
#[test]
fn cardinality_collector_merge_stays_exact_for_small_sets() {
use super::CardinalityCollector;
let mut left = CardinalityCollector::default();
for i in 0u64..50 {
left.insert(i);
}
let mut right = CardinalityCollector::default();
for i in 30u64..100 {
right.insert(i);
}
left.merge_fruits(right).unwrap();
let estimate = left.finalize().unwrap();
// 100 distinct values (0..100). Both sketches are in Set mode (< 192 coupons),
// so the union should stay in coupon mode and give an exact count.
assert_eq!(estimate, 100.0);
}
#[test]
fn cardinality_collector_serialize_deserialize_binary() {
use datasketches::hll::HllSketch;
@@ -591,6 +758,98 @@ mod tests {
assert!((deserialized.estimate() - 3.0).abs() < 0.01);
}
/// Tests that the `missing` parameter correctly counts a single empty document
/// for both u64 and str columns.
#[test]
fn cardinality_aggregation_missing_value_single_empty_doc() {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", FAST);
let name_field = schema_builder.add_text_field("name", STRING | FAST);
let index = Index::create_in_ram(schema_builder.build());
let mut writer = index.writer_for_tests().unwrap();
writer
.add_document(doc!(id_field=>1u64,name_field=>"some_name"))
.unwrap();
writer.add_document(doc!()).unwrap();
writer.commit().unwrap();
{
// int colum with missing value non redundant
let agg_req: Aggregations = serde_json::from_value(json!({
"cardinality": {
"cardinality": {
"field": "id",
"missing": 42u64
},
}
}))
.unwrap();
let res = exec_request(agg_req, &index).unwrap();
assert_eq!(res["cardinality"]["value"], 2.0);
}
{
// int colum with missing value redundant
let agg_req: Aggregations = serde_json::from_value(json!({
"cardinality": {
"cardinality": {
"field": "id",
"missing": 1u64
},
}
}))
.unwrap();
let res = exec_request(agg_req, &index).unwrap();
assert_eq!(res["cardinality"]["value"], 1.0);
}
{
// str colum with missing value non redundant
// With more than one segment, this is not well handled.
let agg_req: Aggregations = serde_json::from_value(json!({
"cardinality": {
"cardinality": {
"field": "name",
"missing": "other_name"
},
}
}))
.unwrap();
let res = exec_request(agg_req, &index).unwrap();
assert_eq!(res["cardinality"]["value"], 2.0);
}
{
// str colum with missing value redundant
let agg_req: Aggregations = serde_json::from_value(json!({
"cardinality": {
"cardinality": {
"field": "name",
"missing": "some_name"
},
}
}))
.unwrap();
let res = exec_request(agg_req, &index).unwrap();
assert_eq!(res["cardinality"]["value"], 1.0);
}
{
// str column with missing value with a number type.
let agg_req: Aggregations = serde_json::from_value(json!({
"cardinality": {
"cardinality": {
"field": "name",
"missing": 3,
},
}
}))
.unwrap();
let res = exec_request(agg_req, &index).unwrap();
assert_eq!(res["cardinality"]["value"], 2.0);
}
}
#[test]
fn cardinality_collector_salt_differentiates_types() {
use super::CardinalityCollector;

View File

@@ -107,10 +107,9 @@ pub enum PercentileValues {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// The entry when requesting percentiles with keyed: false
pub struct PercentileValuesVecEntry {
/// Percentile
/// The percentile key (e.g. 1.0, 5.0, 25.0).
pub key: f64,
/// Value at the percentile
/// The percentile value. `NaN` when there are no values.
pub value: f64,
}

View File

@@ -133,7 +133,7 @@ mod agg_limits;
pub mod agg_req;
pub mod agg_result;
pub mod bucket;
pub(crate) mod cached_sub_aggs;
pub(crate) mod buffered_sub_aggs;
mod collector;
mod date;
mod error;

View File

@@ -1,5 +1,6 @@
use super::Collector;
use crate::collector::SegmentCollector;
use crate::query::Weight;
use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
/// `CountCollector` collector only counts how many
@@ -55,6 +56,15 @@ impl Collector for Count {
fn merge_fruits(&self, segment_counts: Vec<usize>) -> crate::Result<usize> {
Ok(segment_counts.into_iter().sum())
}
fn collect_segment(
&self,
weight: &dyn Weight,
_segment_ord: u32,
reader: &SegmentReader,
) -> crate::Result<usize> {
Ok(weight.count(reader)? as usize)
}
}
#[derive(Default)]

View File

@@ -1,5 +1,8 @@
use std::cmp::{Ordering, Reverse};
use std::collections::BinaryHeap;
use crate::collector::sort_key::NaturalComparator;
use crate::collector::{SegmentSortKeyComputer, SortKeyComputer, TopNComputer};
use crate::collector::{SegmentSortKeyComputer, SortKeyComputer};
use crate::{DocAddress, DocId, Score};
/// Sort by similarity score.
@@ -25,6 +28,10 @@ impl SortKeyComputer for SortBySimilarityScore {
}
// Sorting by score is special in that it allows for the Block-Wand optimization.
//
// We use a BinaryHeap (TopNHeap) instead of TopNComputer here so that the
// threshold is always the exact K-th best score. TopNComputer only updates its
// threshold every K docs (at truncation), giving Block-WAND a stale bound.
fn collect_segment_top_k(
&self,
k: usize,
@@ -32,12 +39,10 @@ impl SortKeyComputer for SortBySimilarityScore {
reader: &crate::SegmentReader,
segment_ord: u32,
) -> crate::Result<Vec<(Self::SortKey, DocAddress)>> {
let mut top_n: TopNComputer<Score, DocId, Self::Comparator> =
TopNComputer::new_with_comparator(k, self.comparator());
let mut top_n = TopNHeap::new(k);
if let Some(alive_bitset) = reader.alive_bitset() {
let mut threshold = Score::MIN;
top_n.threshold = Some(threshold);
weight.for_each_pruning(Score::MIN, reader, &mut |doc, score| {
if alive_bitset.is_deleted(doc) {
return threshold;
@@ -56,7 +61,7 @@ impl SortKeyComputer for SortBySimilarityScore {
Ok(top_n
.into_vec()
.into_iter()
.map(|cid| (cid.sort_key, DocAddress::new(segment_ord, cid.doc)))
.map(|(score, doc)| (score, DocAddress::new(segment_ord, doc)))
.collect())
}
}
@@ -75,3 +80,204 @@ impl SegmentSortKeyComputer for SortBySimilarityScore {
score
}
}
/// Min-heap entry: higher score = greater, lower doc wins ties.
struct ScoreHeapEntry {
score: Score,
doc: DocId,
}
impl Eq for ScoreHeapEntry {}
impl PartialEq for ScoreHeapEntry {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl PartialOrd for ScoreHeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ScoreHeapEntry {
fn cmp(&self, other: &Self) -> Ordering {
self.score
.partial_cmp(&other.score)
.unwrap_or(Ordering::Equal)
.then_with(|| other.doc.cmp(&self.doc))
}
}
/// Heap-based top-K for score collection. O(log K) per insert, but the threshold
/// is always tight, so Block-WAND prunes better than with [`TopNComputer`]'s
/// buffer/median approach.
///
/// Like [`TopNComputer`], items must arrive in ascending doc order, and equal
/// scores are rejected (strict `>`) so that lower doc IDs win ties.
///
/// [`TopNComputer`]: crate::collector::TopNComputer
struct TopNHeap {
heap: BinaryHeap<Reverse<ScoreHeapEntry>>,
top_n: usize,
threshold: Option<Score>,
}
impl TopNHeap {
fn new(top_n: usize) -> Self {
TopNHeap {
heap: BinaryHeap::with_capacity(top_n),
top_n,
threshold: None,
}
}
#[inline]
fn push(&mut self, score: Score, doc: DocId) {
if self.heap.len() < self.top_n {
self.heap.push(Reverse(ScoreHeapEntry { score, doc }));
if self.heap.len() == self.top_n {
self.threshold = self.heap.peek().map(|Reverse(entry)| entry.score);
}
} else if let Some(threshold) = self.threshold {
if score > threshold {
// peek_mut + assign is a single sift-down, vs pop + push = two sifts.
if let Some(mut min) = self.heap.peek_mut() {
*min = Reverse(ScoreHeapEntry { score, doc });
}
self.threshold = self.heap.peek().map(|Reverse(entry)| entry.score);
}
}
}
fn into_vec(self) -> Vec<(Score, DocId)> {
self.heap
.into_vec()
.into_iter()
.map(|Reverse(entry)| (entry.score, entry.doc))
.collect()
}
}
#[cfg(test)]
mod tests {
use proptest::prelude::*;
use super::*;
use crate::collector::sort_key::NaturalComparator;
use crate::collector::TopNComputer;
#[test]
fn test_top_n_heap_zero_capacity() {
let mut heap = TopNHeap::new(0);
heap.push(1.0, 0);
heap.push(2.0, 1);
assert!(heap.into_vec().is_empty());
}
#[test]
fn test_top_n_heap_basic() {
let mut heap = TopNHeap::new(2);
heap.push(1.0, 0);
heap.push(3.0, 1);
heap.push(2.0, 2);
let mut results = heap.into_vec();
results.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap().then_with(|| a.1.cmp(&b.1)));
assert_eq!(results, vec![(3.0, 1), (2.0, 2)]);
}
#[test]
fn test_top_n_heap_threshold_always_accurate() {
let mut heap = TopNHeap::new(2);
assert_eq!(heap.threshold, None);
heap.push(1.0, 0);
assert_eq!(heap.threshold, None);
heap.push(3.0, 1);
assert_eq!(heap.threshold, Some(1.0));
heap.push(2.0, 2); // evicts 1.0
assert_eq!(heap.threshold, Some(2.0));
heap.push(4.0, 3); // evicts 2.0
assert_eq!(heap.threshold, Some(3.0));
}
#[test]
fn test_top_n_heap_tiebreaking_lower_doc_wins() {
let mut heap = TopNHeap::new(2);
heap.push(5.0, 0);
heap.push(5.0, 1);
heap.push(5.0, 2); // rejected: not strictly > threshold
let mut results = heap.into_vec();
results.sort_by_key(|&(_, doc)| doc);
assert_eq!(results, vec![(5.0, 0), (5.0, 1)]);
}
#[test]
fn test_top_n_heap_single_element() {
let mut heap = TopNHeap::new(1);
heap.push(1.0, 0);
assert_eq!(heap.threshold, Some(1.0));
heap.push(0.5, 1); // rejected
heap.push(2.0, 2); // accepted
assert_eq!(heap.threshold, Some(2.0));
let results = heap.into_vec();
assert_eq!(results, vec![(2.0, 2)]);
}
#[test]
fn test_top_n_heap_under_capacity() {
let mut heap = TopNHeap::new(5);
heap.push(3.0, 0);
heap.push(1.0, 1);
heap.push(2.0, 2);
// Only 3 elements, capacity is 5 — all should be kept
assert_eq!(heap.threshold, None);
let mut results = heap.into_vec();
results.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap().then_with(|| a.1.cmp(&b.1)));
assert_eq!(results, vec![(3.0, 0), (2.0, 2), (1.0, 1)]);
}
proptest! {
#[test]
fn test_top_n_heap_matches_top_n_computer(
limit in 0..20_usize,
mut docs in proptest::collection::vec((0..1000_u32, 0..1000_u32), 0..200_usize),
) {
// Both require ascending doc order.
docs.sort_by_key(|(_, doc_id)| *doc_id);
docs.dedup_by_key(|(_, doc_id)| *doc_id);
let mut heap = TopNHeap::new(limit);
let mut computer: TopNComputer<Score, DocId, NaturalComparator> =
TopNComputer::new_with_comparator(limit, NaturalComparator);
for &(score_u32, doc) in &docs {
let score = score_u32 as Score;
heap.push(score, doc);
computer.push(score, doc);
}
let mut heap_results = heap.into_vec();
heap_results.sort_by(|a, b| {
b.0.partial_cmp(&a.0).unwrap().then_with(|| a.1.cmp(&b.1))
});
let computer_results: Vec<(Score, DocId)> = computer
.into_sorted_vec()
.into_iter()
.map(|cd| (cd.sort_key, cd.doc))
.collect();
prop_assert_eq!(heap_results, computer_results);
}
}
}

View File

@@ -513,7 +513,9 @@ pub struct TopNComputer<Score, D, C> {
/// The buffer reverses sort order to get top-semantics instead of bottom-semantics
buffer: Vec<ComparableDoc<Score, D>>,
top_n: usize,
pub(crate) threshold: Option<Score>,
/// The current threshold for pruning. Documents with scores at or below
/// this value are skipped by `push()`. Updated when the buffer is truncated.
pub threshold: Option<Score>,
comparator: C,
}

View File

@@ -1,5 +1,7 @@
use std::borrow::{Borrow, BorrowMut};
use common::TinySet;
use crate::fastfield::AliveBitSet;
use crate::DocId;
@@ -14,6 +16,12 @@ pub const TERMINATED: DocId = i32::MAX as u32;
/// exactly this size as long as we can fill the buffer.
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;
/// Number of `TinySet` (64-bit) buckets in a block used by [`DocSet::fill_bitset_block`].
pub const BLOCK_NUM_TINYBITSETS: usize = 16;
/// Number of doc IDs covered by one block: `BLOCK_NUM_TINYBITSETS * 64 = 1024`.
pub const BLOCK_WINDOW: u32 = BLOCK_NUM_TINYBITSETS as u32 * 64;
/// Represents an iterable set of sorted doc ids.
pub trait DocSet: Send {
/// Goes to the next element.
@@ -160,6 +168,31 @@ pub trait DocSet: Send {
self.size_hint() as u64
}
/// Fills a bitmask representing which documents in `[min_doc, min_doc + BLOCK_WINDOW)` are
/// present in this docset.
///
/// The window is divided into `BLOCK_NUM_TINYBITSETS` buckets of 64 docs each.
/// Returns the next doc `>= min_doc + BLOCK_WINDOW`, or `TERMINATED` if exhausted.
fn fill_bitset_block(
&mut self,
min_doc: DocId,
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
) -> DocId {
self.seek(min_doc);
let horizon = min_doc + BLOCK_WINDOW;
loop {
let doc = self.doc();
if doc >= horizon {
return doc;
}
let delta = doc - min_doc;
mask[(delta / 64) as usize].insert_mut(delta % 64);
if self.advance() == TERMINATED {
return TERMINATED;
}
}
}
/// Returns the number documents matching.
/// Calling this method consumes the `DocSet`.
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
@@ -214,6 +247,18 @@ impl DocSet for &mut dyn DocSet {
(**self).seek_danger(target)
}
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
(**self).fill_buffer(buffer)
}
fn fill_bitset_block(
&mut self,
min_doc: DocId,
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
) -> DocId {
(**self).fill_bitset_block(min_doc, mask)
}
fn doc(&self) -> u32 {
(**self).doc()
}
@@ -256,6 +301,15 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.fill_buffer(buffer)
}
fn fill_bitset_block(
&mut self,
min_doc: DocId,
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
) -> DocId {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.fill_bitset_block(min_doc, mask)
}
fn doc(&self) -> DocId {
let unboxed: &TDocSet = self.borrow();
unboxed.doc()

View File

@@ -1,5 +1,7 @@
use common::TinySet;
use super::size_hint::estimate_intersection;
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
use crate::docset::{DocSet, SeekDangerResult, BLOCK_NUM_TINYBITSETS, TERMINATED};
use crate::query::term_query::TermScorer;
use crate::query::{EmptyScorer, Scorer};
use crate::{DocId, Score};
@@ -17,7 +19,7 @@ use crate::{DocId, Score};
/// `size_hint` of the intersection.
pub fn intersect_scorers(
mut scorers: Vec<Box<dyn Scorer>>,
num_docs_segment: u32,
segment_num_docs: u32,
) -> Box<dyn Scorer> {
if scorers.is_empty() {
return Box::new(EmptyScorer);
@@ -42,14 +44,14 @@ pub fn intersect_scorers(
left: *(left.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
right: *(right.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
others: scorers,
num_docs: num_docs_segment,
segment_num_docs,
});
}
Box::new(Intersection {
left,
right,
others: scorers,
num_docs: num_docs_segment,
segment_num_docs,
})
}
@@ -58,7 +60,7 @@ pub struct Intersection<TDocSet: DocSet, TOtherDocSet: DocSet = Box<dyn Scorer>>
left: TDocSet,
right: TDocSet,
others: Vec<TOtherDocSet>,
num_docs: u32,
segment_num_docs: u32,
}
fn go_to_first_doc<TDocSet: DocSet>(docsets: &mut [TDocSet]) -> DocId {
@@ -78,7 +80,10 @@ fn go_to_first_doc<TDocSet: DocSet>(docsets: &mut [TDocSet]) -> DocId {
impl<TDocSet: DocSet> Intersection<TDocSet, TDocSet> {
/// num_docs is the number of documents in the segment.
pub(crate) fn new(mut docsets: Vec<TDocSet>, num_docs: u32) -> Intersection<TDocSet, TDocSet> {
pub(crate) fn new(
mut docsets: Vec<TDocSet>,
segment_num_docs: u32,
) -> Intersection<TDocSet, TDocSet> {
let num_docsets = docsets.len();
assert!(num_docsets >= 2);
docsets.sort_by_key(|docset| docset.cost());
@@ -97,7 +102,7 @@ impl<TDocSet: DocSet> Intersection<TDocSet, TDocSet> {
left,
right,
others: docsets,
num_docs,
segment_num_docs,
}
}
}
@@ -214,7 +219,7 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
[self.left.size_hint(), self.right.size_hint()]
.into_iter()
.chain(self.others.iter().map(DocSet::size_hint)),
self.num_docs,
self.segment_num_docs,
)
}
@@ -224,6 +229,91 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
// If there are docsets that are bad at skipping, they should also influence the cost.
self.left.cost()
}
fn count_including_deleted(&mut self) -> u32 {
const DENSITY_THRESHOLD_INVERSE: u32 = 32;
if self
.left
.size_hint()
.saturating_mul(DENSITY_THRESHOLD_INVERSE)
< self.segment_num_docs
{
// Sparse path: if the lead iterator covers less than ~3% of docs,
// the block approach wastes time on mostly-empty blocks.
self.count_including_deleted_sparse()
} else {
// Dense approach. We push documents into a block bitset to then
// perform count using popcount.
self.count_including_deleted_dense()
}
}
}
const EMPTY_BLOCK: [TinySet; BLOCK_NUM_TINYBITSETS] = [TinySet::EMPTY; BLOCK_NUM_TINYBITSETS];
/// ANDs `other` into `mask` in-place. Returns `true` if the result is all zeros.
#[inline]
fn and_blocks_and_return_is_empty(
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
update: &[TinySet; BLOCK_NUM_TINYBITSETS],
) -> bool {
let mut all_empty = true;
for (mask_tinyset, update_tinyset) in mask.iter_mut().zip(update.iter()) {
*mask_tinyset = mask_tinyset.intersect(*update_tinyset);
all_empty &= mask_tinyset.is_empty();
}
all_empty
}
impl<TDocSet: DocSet, TOtherDocSet: DocSet> Intersection<TDocSet, TOtherDocSet> {
fn count_including_deleted_sparse(&mut self) -> u32 {
let mut count = 0u32;
let mut doc = self.doc();
while doc != TERMINATED {
count += 1;
doc = self.advance();
}
count
}
/// Dense block-wise bitmask intersection count.
///
/// Fills a 1024-doc window from each iterator, ANDs the bitmasks together,
/// and popcounts the result. `fill_bitset_block` handles seeking tails forward
/// when they lag behind the current block.
fn count_including_deleted_dense(&mut self) -> u32 {
let mut count = 0u32;
let mut next_base = self.left.doc();
while next_base < TERMINATED {
let base = next_base;
// Fill lead bitmask.
let mut mask = EMPTY_BLOCK;
next_base = next_base.max(self.left.fill_bitset_block(base, &mut mask));
let mut tail_mask = EMPTY_BLOCK;
next_base = next_base.max(self.right.fill_bitset_block(base, &mut tail_mask));
if and_blocks_and_return_is_empty(&mut mask, &tail_mask) {
continue;
}
// AND with each additional tail.
for other in &mut self.others {
let mut other_mask = EMPTY_BLOCK;
next_base = next_base.max(other.fill_bitset_block(base, &mut other_mask));
if and_blocks_and_return_is_empty(&mut mask, &other_mask) {
continue;
}
}
for tinyset in &mask {
count += tinyset.len();
}
}
count
}
}
impl<TScorer, TOtherScorer> Scorer for Intersection<TScorer, TOtherScorer>
@@ -421,6 +511,82 @@ mod tests {
}
}
proptest! {
#[test]
fn prop_test_count_including_deleted_matches_default(
a in sorted_deduped_vec(1200, 400),
b in sorted_deduped_vec(1200, 400),
c in sorted_deduped_vec(1200, 400),
num_docs in 1200u32..2000u32,
) {
// Compute expected count via set intersection.
let expected: u32 = a.iter()
.filter(|doc| b.contains(doc) && c.contains(doc))
.count() as u32;
// Test count_including_deleted (dense path).
let make_intersection = || {
Intersection::new(
vec![
VecDocSet::from(a.clone()),
VecDocSet::from(b.clone()),
VecDocSet::from(c.clone()),
],
num_docs,
)
};
let mut intersection = make_intersection();
let count = intersection.count_including_deleted();
prop_assert_eq!(count, expected,
"count_including_deleted mismatch: a={:?}, b={:?}, c={:?}", a, b, c);
}
}
#[test]
fn test_count_including_deleted_two_way() {
let left = VecDocSet::from(vec![1, 3, 9]);
let right = VecDocSet::from(vec![3, 4, 9, 18]);
let mut intersection = Intersection::new(vec![left, right], 100);
assert_eq!(intersection.count_including_deleted(), 2);
}
#[test]
fn test_count_including_deleted_empty() {
let a = VecDocSet::from(vec![1, 3]);
let b = VecDocSet::from(vec![1, 4]);
let c = VecDocSet::from(vec![3, 9]);
let mut intersection = Intersection::new(vec![a, b, c], 100);
assert_eq!(intersection.count_including_deleted(), 0);
}
/// Test with enough documents to exercise the dense path (>= num_docs/32).
#[test]
fn test_count_including_deleted_dense_path() {
// Create dense docsets: many docs relative to segment size.
let docs_a: Vec<u32> = (0..2000).step_by(2).collect(); // even numbers 0..2000
let docs_b: Vec<u32> = (0..2000).step_by(3).collect(); // multiples of 3
let expected = docs_a.iter().filter(|d| *d % 3 == 0).count() as u32;
let a = VecDocSet::from(docs_a);
let b = VecDocSet::from(docs_b);
let mut intersection = Intersection::new(vec![a, b], 2000);
assert_eq!(intersection.count_including_deleted(), expected);
}
/// Test that spans multiple blocks (>1024 docs).
#[test]
fn test_count_including_deleted_multi_block() {
let docs_a: Vec<u32> = (0..5000).collect();
let docs_b: Vec<u32> = (0..5000).step_by(7).collect();
let expected = docs_b.len() as u32; // all of b is in a
let a = VecDocSet::from(docs_a);
let b = VecDocSet::from(docs_b);
let mut intersection = Intersection::new(vec![a, b], 5000);
assert_eq!(intersection.count_including_deleted(), expected);
}
#[test]
fn test_bug_2811_intersection_candidate_should_increase() {
let mut schema_builder = Schema::builder();

View File

@@ -117,6 +117,12 @@ impl DocSet for TermScorer {
fn size_hint(&self) -> u32 {
self.postings.size_hint()
}
// TODO
// It is probably possible to optimize fill_bitset_block for TermScorer,
// working directly with the blocks, enabling vectorization.
// I did not manage to get a performance improvement on Mac ARM,
// and do not have access to x86 to investigate.
}
impl Scorer for TermScorer {

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-sstable"
version = "0.6.0"
version = "0.7.0"
edition = "2024"
license = "MIT"
homepage = "https://github.com/quickwit-oss/tantivy"
@@ -10,10 +10,10 @@ categories = ["database-implementations", "data-structures", "compression"]
description = "sstables for tantivy"
[dependencies]
common = {version= "0.10", path="../common", package="tantivy-common"}
common = {version= "0.11", path="../common", package="tantivy-common"}
futures-util = "0.3.30"
itertools = "0.14.0"
tantivy-bitpacker = { version= "0.9", path="../bitpacker" }
tantivy-bitpacker = { version= "0.10", path="../bitpacker" }
tantivy-fst = "0.5"
# experimental gives us access to Decompressor::upper_bound
zstd = { version = "0.13", optional = true, features = ["experimental"] }

View File

@@ -512,11 +512,13 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Returns the terms for a _sorted_ list of term ordinals.
///
/// Returns true if and only if all terms have been found.
pub fn sorted_ords_to_term_cb<F: FnMut(&[u8]) -> io::Result<()>>(
pub fn sorted_ords_to_term_cb(
&self,
mut ords: impl Iterator<Item = TermOrdinal>,
mut cb: F,
ords: &[TermOrdinal],
mut cb: impl FnMut(&[u8]),
) -> io::Result<bool> {
assert!(ords.is_sorted());
let mut ords = ords.iter().copied();
let Some(mut ord) = ords.next() else {
return Ok(true);
};
@@ -538,33 +540,36 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
bytes.extend_from_slice(current_sstable_delta_reader.suffix());
current_block_ordinal += 1;
}
cb(&bytes)?;
cb(&bytes);
// fetch the next ordinal
let Some(next_ord) = ords.next() else {
return Ok(true);
let next_ord = loop {
let Some(next_ord) = ords.next() else {
return Ok(true);
};
if next_ord == ord {
// This is the same ordinal, let's just call the callback directly.
cb(&bytes);
} else {
// we checked it was sorted beforehands
debug_assert!(next_ord > ord);
break next_ord;
}
};
// advance forward if the new ord is different than the one we just processed
// TODO optimization: it is silly to do a binary search to get the block every single
// time.
//
// this allows the input TermOrdinal iterator to contain duplicates, so long as it's
// still sorted
if next_ord < ord {
panic!("Ordinals were not sorted: received {next_ord} after {ord}");
} else if next_ord > ord {
// check if block changed for new term_ord
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
if new_block_addr != current_block_addr {
current_block_addr = new_block_addr;
current_block_ordinal = current_block_addr.first_ordinal;
current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
bytes.clear();
}
ord = next_ord;
} else {
// The next ord is equal to the previous ord: no need to seek or advance.
// Check if block changed for new term_ord
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
if new_block_addr != current_block_addr {
current_block_addr = new_block_addr;
current_block_ordinal = current_block_addr.first_ordinal;
current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
bytes.clear();
}
ord = next_ord;
}
}
@@ -671,8 +676,8 @@ mod tests {
use common::OwnedBytes;
use super::Dictionary;
use crate::MonotonicU64SSTable;
use crate::dictionary::TermOrdHit;
use crate::{MonotonicU64SSTable, TermOrdinal};
#[derive(Debug)]
struct PermissionedHandle {
@@ -935,25 +940,24 @@ mod tests {
}
#[test]
fn test_ords_term() {
fn test_sorted_ords_to_term() {
let (dic, _slice) = make_test_sstable();
// Single term
let mut terms = Vec::new();
assert!(
dic.sorted_ords_to_term_cb(100_000..100_001, |term| {
dic.sorted_ords_to_term_cb(&[100_000], |term| {
terms.push(term.to_vec());
Ok(())
})
.unwrap()
);
assert_eq!(terms, vec![format!("{:05X}", 100_000).into_bytes(),]);
// Single term
let mut terms = Vec::new();
let ords: Vec<TermOrdinal> = (100_001..100_002).collect();
assert!(
dic.sorted_ords_to_term_cb(100_001..100_002, |term| {
dic.sorted_ords_to_term_cb(&ords, |term| {
terms.push(term.to_vec());
Ok(())
})
.unwrap()
);
@@ -961,9 +965,8 @@ mod tests {
// both terms
let mut terms = Vec::new();
assert!(
dic.sorted_ords_to_term_cb(100_000..100_002, |term| {
dic.sorted_ords_to_term_cb(&[100_000, 100_001], |term| {
terms.push(term.to_vec());
Ok(())
})
.unwrap()
);
@@ -976,10 +979,10 @@ mod tests {
);
// Test cross block
let mut terms = Vec::new();
let ords: Vec<TermOrdinal> = (98653..=98655).collect();
assert!(
dic.sorted_ords_to_term_cb(98653..=98655, |term| {
dic.sorted_ords_to_term_cb(&ords, |term| {
terms.push(term.to_vec());
Ok(())
})
.unwrap()
);
@@ -991,6 +994,43 @@ mod tests {
format!("{:05X}", 98655).into_bytes(),
]
);
// redundant
let mut terms = Vec::new();
let ords: Vec<TermOrdinal> = vec![1, 1, 2];
assert!(
dic.sorted_ords_to_term_cb(&ords, |term| {
terms.push(term.to_vec());
})
.unwrap()
);
assert_eq!(
terms,
vec![
format!("{:05X}", 1).into_bytes(),
format!("{:05X}", 1).into_bytes(),
format!("{:05X}", 2).into_bytes(),
]
);
// redundant cross block
let mut terms = Vec::new();
let ords: Vec<TermOrdinal> = vec![98653, 98653, 98654, 98654, 98655, 98655];
assert!(
dic.sorted_ords_to_term_cb(&ords, |term| {
terms.push(term.to_vec());
})
.unwrap()
);
assert_eq!(
terms,
vec![
format!("{:05X}", 98_653).into_bytes(),
format!("{:05X}", 98_653).into_bytes(),
format!("{:05X}", 98_654).into_bytes(),
format!("{:05X}", 98_654).into_bytes(),
format!("{:05X}", 98_655).into_bytes(),
format!("{:05X}", 98_655).into_bytes(),
]
);
}
#[test]

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-stacker"
version = "0.6.0"
version = "0.7.0"
edition = "2024"
license = "MIT"
homepage = "https://github.com/quickwit-oss/tantivy"
@@ -9,7 +9,7 @@ description = "term hashmap used for indexing"
[dependencies]
murmurhash32 = "0.3"
common = { version = "0.10", path = "../common/", package = "tantivy-common" }
common = { version = "0.11", path = "../common/", package = "tantivy-common" }
ahash = { version = "0.8.11", default-features = false, optional = true }
@@ -27,7 +27,7 @@ rand = "0.9"
zipf = "7.0.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.15.3" }
binggan = { version = "0.16.1" }
rand_distr = "0.5"
[features]

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-tokenizer-api"
version = "0.6.0"
version = "0.7.0"
license = "MIT"
edition = "2021"
description = "Tokenizer API of tantivy"