mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-31 15:40:40 +00:00
Compare commits
12 Commits
0.26.1
...
paul.masur
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d5e2709b1b | ||
|
|
13d74c3c20 | ||
|
|
058afff8b7 | ||
|
|
58aa4b7074 | ||
|
|
04beab3b29 | ||
|
|
3cd9011f87 | ||
|
|
d2c1b8bc2c | ||
|
|
a65107135a | ||
|
|
5c344db1bf | ||
|
|
dc0f31554d | ||
|
|
a28ce3ee54 | ||
|
|
cf9800f981 |
4
.github/dependabot.yml
vendored
4
.github/dependabot.yml
vendored
@@ -6,6 +6,8 @@ updates:
|
||||
interval: daily
|
||||
time: "20:00"
|
||||
open-pull-requests-limit: 10
|
||||
cooldown:
|
||||
default-days: 2
|
||||
|
||||
- package-ecosystem: "github-actions"
|
||||
directory: "/"
|
||||
@@ -13,3 +15,5 @@ updates:
|
||||
interval: daily
|
||||
time: "20:00"
|
||||
open-pull-requests-limit: 10
|
||||
cooldown:
|
||||
default-days: 2
|
||||
|
||||
@@ -45,6 +45,7 @@ Tantivy 0.26 (Unreleased)
|
||||
- Add `seek_danger` on `DocSet` for more efficient intersections [#2538](https://github.com/quickwit-oss/tantivy/pull/2538) [#2810](https://github.com/quickwit-oss/tantivy/pull/2810)(@PSeitz @stuhood @fulmicoton)
|
||||
- Skip column traversal in `RangeDocSet` when query range does not overlap with column bounds [#2783](https://github.com/quickwit-oss/tantivy/pull/2783)(@ChangRui-Ryan)
|
||||
- Speed up exclude queries by supporting multiple excluded `DocSet`s without intermediate union [#2825](https://github.com/quickwit-oss/tantivy/pull/2825)(@PSeitz)
|
||||
- Improve union performance for non-score unions with `fill_buffer` and optimized `TinySet` [#2863](https://github.com/quickwit-oss/tantivy/pull/2863)(@PSeitz)
|
||||
|
||||
Tantivy 0.25
|
||||
================================
|
||||
|
||||
18
Cargo.toml
18
Cargo.toml
@@ -57,15 +57,15 @@ measure_time = "0.9.0"
|
||||
arc-swap = "1.5.0"
|
||||
bon = "3.3.1"
|
||||
|
||||
columnar = { version = "0.6", path = "./columnar", package = "tantivy-columnar" }
|
||||
sstable = { version = "0.6", path = "./sstable", package = "tantivy-sstable", optional = true }
|
||||
stacker = { version = "0.6", path = "./stacker", package = "tantivy-stacker" }
|
||||
query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tantivy-query-grammar" }
|
||||
tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
|
||||
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
columnar = { version = "0.7", path = "./columnar", package = "tantivy-columnar" }
|
||||
sstable = { version = "0.7", path = "./sstable", package = "tantivy-sstable", optional = true }
|
||||
stacker = { version = "0.7", path = "./stacker", package = "tantivy-stacker" }
|
||||
query-grammar = { version = "0.26.0", path = "./query-grammar", package = "tantivy-query-grammar" }
|
||||
tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
|
||||
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
|
||||
datasketches = "0.2.0"
|
||||
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "7635fb8" }
|
||||
futures-util = { version = "0.3.28", optional = true }
|
||||
futures-channel = { version = "0.3.28", optional = true }
|
||||
fnv = "1.0.7"
|
||||
@@ -75,7 +75,7 @@ typetag = "0.2.21"
|
||||
winapi = "0.3.9"
|
||||
|
||||
[dev-dependencies]
|
||||
binggan = "0.15.3"
|
||||
binggan = "0.16.1"
|
||||
rand = "0.9"
|
||||
maplit = "1.0.2"
|
||||
matches = "0.1.9"
|
||||
|
||||
@@ -78,6 +78,7 @@ fn bench_agg(mut group: InputGroup<Index>) {
|
||||
|
||||
register!(group, cardinality_agg);
|
||||
register!(group, terms_status_with_cardinality_agg);
|
||||
register!(group, terms_100_buckets_with_cardinality_agg);
|
||||
|
||||
register!(group, range_agg);
|
||||
register!(group, range_agg_with_avg_sub_agg);
|
||||
@@ -169,6 +170,22 @@ fn terms_status_with_cardinality_agg(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"my_texts": {
|
||||
"terms": { "field": "text_few_terms_status" },
|
||||
"aggs": {
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "text_few_terms_status"
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
fn terms_100_buckets_with_cardinality_agg(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"my_texts": {
|
||||
"terms": { "field": "text_1000_terms_zipf", "size": 100 },
|
||||
"aggs": {
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-bitpacker"
|
||||
version = "0.9.0"
|
||||
version = "0.10.0"
|
||||
edition = "2024"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-columnar"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
@@ -12,10 +12,10 @@ categories = ["database-implementations", "data-structures", "compression"]
|
||||
itertools = "0.14.0"
|
||||
fastdivide = "0.4.0"
|
||||
|
||||
stacker = { version= "0.6", path = "../stacker", package="tantivy-stacker"}
|
||||
sstable = { version= "0.6", path = "../sstable", package = "tantivy-sstable" }
|
||||
common = { version= "0.10", path = "../common", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version= "0.9", path = "../bitpacker/" }
|
||||
stacker = { version= "0.7", path = "../stacker", package="tantivy-stacker"}
|
||||
sstable = { version= "0.7", path = "../sstable", package = "tantivy-sstable" }
|
||||
common = { version= "0.11", path = "../common", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version= "0.10", path = "../bitpacker/" }
|
||||
serde = "1.0.152"
|
||||
downcast-rs = "2.0.1"
|
||||
|
||||
@@ -23,7 +23,7 @@ downcast-rs = "2.0.1"
|
||||
proptest = "1"
|
||||
more-asserts = "0.3.1"
|
||||
rand = "0.9"
|
||||
binggan = "0.15.3"
|
||||
binggan = "0.16.1"
|
||||
|
||||
[[bench]]
|
||||
name = "bench_merge"
|
||||
|
||||
@@ -33,14 +33,14 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
|
||||
&mut self,
|
||||
docs: &[u32],
|
||||
accessor: &Column<T>,
|
||||
missing: Option<T>,
|
||||
missing_opt: Option<T>,
|
||||
) {
|
||||
self.fetch_block(docs, accessor);
|
||||
// no missing values
|
||||
if accessor.index.get_cardinality().is_full() {
|
||||
return;
|
||||
}
|
||||
let Some(missing) = missing else {
|
||||
let Some(missing) = missing_opt else {
|
||||
return;
|
||||
};
|
||||
|
||||
@@ -191,6 +191,7 @@ where F: FnMut(u32) {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::field_reassign_with_default)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-common"
|
||||
version = "0.10.0"
|
||||
version = "0.11.0"
|
||||
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
|
||||
license = "MIT"
|
||||
edition = "2024"
|
||||
@@ -19,6 +19,6 @@ time = { version = "0.3.47", features = ["serde-well-known"] }
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
|
||||
[dev-dependencies]
|
||||
binggan = "0.15.3"
|
||||
binggan = "0.16.1"
|
||||
proptest = "1.0.0"
|
||||
rand = "0.9"
|
||||
|
||||
@@ -47,6 +47,9 @@ impl TinySet {
|
||||
TinySet(val)
|
||||
}
|
||||
|
||||
/// An empty `TinySet` constant.
|
||||
pub const EMPTY: TinySet = TinySet(0u64);
|
||||
|
||||
/// Returns an empty `TinySet`.
|
||||
#[inline]
|
||||
pub fn empty() -> TinySet {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-query-grammar"
|
||||
version = "0.25.0"
|
||||
version = "0.26.0"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
|
||||
@@ -208,7 +208,8 @@ pub enum BucketEntries<T> {
|
||||
}
|
||||
|
||||
impl<T> BucketEntries<T> {
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = &'a T> + 'a> {
|
||||
/// Iterate over all bucket entries.
|
||||
pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = &'a T> + 'a> {
|
||||
match self {
|
||||
BucketEntries::Vec(vec) => Box::new(vec.iter()),
|
||||
BucketEntries::HashMap(map) => Box::new(map.values()),
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::aggregation::bucket::composite::map::{DynArrayHeapMap, MAX_DYN_ARRAY_
|
||||
use crate::aggregation::bucket::{
|
||||
CalendarInterval, CompositeAggregationSource, MissingOrder, Order,
|
||||
};
|
||||
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardSubAggCache};
|
||||
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardSubAggBuffer};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
CompositeIntermediateKey, IntermediateAggregationResult, IntermediateAggregationResults,
|
||||
IntermediateBucketResult, IntermediateCompositeBucketEntry, IntermediateCompositeBucketResult,
|
||||
@@ -119,7 +119,7 @@ pub struct SegmentCompositeCollector {
|
||||
/// One DynArrayHeapMap per parent bucket.
|
||||
parent_buckets: Vec<DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>>,
|
||||
accessor_idx: usize,
|
||||
sub_agg: Option<CachedSubAggs<HighCardSubAggCache>>,
|
||||
sub_agg: Option<BufferedSubAggs<HighCardSubAggBuffer>>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
/// Number of sources, needed when creating new DynArrayHeapMaps.
|
||||
num_sources: usize,
|
||||
@@ -218,7 +218,7 @@ impl SegmentCompositeCollector {
|
||||
let has_sub_aggregations = !node.children.is_empty();
|
||||
let sub_agg = if has_sub_aggregations {
|
||||
let sub_agg_collector = build_segment_agg_collectors(req_data, &node.children)?;
|
||||
Some(CachedSubAggs::new(sub_agg_collector))
|
||||
Some(BufferedSubAggs::new(sub_agg_collector))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -332,7 +332,7 @@ fn collect_bucket_with_limit(
|
||||
limit_num_buckets: usize,
|
||||
buckets: &mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
|
||||
key: &[InternalValueRepr],
|
||||
sub_agg: &mut Option<CachedSubAggs<HighCardSubAggCache>>,
|
||||
sub_agg: &mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
|
||||
bucket_id_provider: &mut BucketIdProvider,
|
||||
) {
|
||||
let mut record_in_bucket = |bucket: &mut CompositeBucketCollector| {
|
||||
@@ -488,7 +488,7 @@ struct CompositeKeyVisitor<'a> {
|
||||
doc_id: crate::DocId,
|
||||
composite_agg_data: &'a CompositeAggReqData,
|
||||
buckets: &'a mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
|
||||
sub_agg: &'a mut Option<CachedSubAggs<HighCardSubAggCache>>,
|
||||
sub_agg: &'a mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
|
||||
bucket_id_provider: &'a mut BucketIdProvider,
|
||||
sub_level_values: SmallVec<[InternalValueRepr; MAX_DYN_ARRAY_SIZE]>,
|
||||
}
|
||||
|
||||
@@ -511,14 +511,14 @@ mod tests {
|
||||
|
||||
fn datetime_from_iso_str(date_str: &str) -> common::DateTime {
|
||||
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
|
||||
.expect(&format!("Failed to parse date: {}", date_str));
|
||||
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
|
||||
let timestamp_secs = dt.unix_timestamp_nanos();
|
||||
common::DateTime::from_timestamp_nanos(timestamp_secs as i64)
|
||||
}
|
||||
|
||||
fn ms_timestamp_from_iso_str(date_str: &str) -> i64 {
|
||||
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
|
||||
.expect(&format!("Failed to parse date: {}", date_str));
|
||||
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
|
||||
(dt.unix_timestamp_nanos() / 1_000_000) as i64
|
||||
}
|
||||
|
||||
@@ -548,7 +548,7 @@ mod tests {
|
||||
agg_req_json["my_composite"]["composite"]["after"] = after_key.take().unwrap();
|
||||
}
|
||||
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
|
||||
let res = exec_request(agg_req.clone(), &index).unwrap();
|
||||
let res = exec_request(agg_req.clone(), index).unwrap();
|
||||
let expected_page_buckets = &expected_buckets_vec[page_idx * page_size
|
||||
..std::cmp::min((page_idx + 1) * page_size, expected_buckets_vec.len())];
|
||||
assert_eq!(
|
||||
@@ -578,7 +578,7 @@ mod tests {
|
||||
}
|
||||
});
|
||||
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
|
||||
let res = exec_request(agg_req.clone(), &index).unwrap();
|
||||
let res = exec_request(agg_req.clone(), index).unwrap();
|
||||
assert_eq!(
|
||||
res["my_composite"]["buckets"],
|
||||
json!([]),
|
||||
|
||||
@@ -6,8 +6,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use crate::aggregation::agg_data::{
|
||||
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
|
||||
};
|
||||
use crate::aggregation::cached_sub_aggs::{
|
||||
CachedSubAggs, HighCardSubAggCache, LowCardSubAggCache, SubAggCache,
|
||||
use crate::aggregation::buffered_sub_aggs::{
|
||||
BufferedSubAggs, HighCardSubAggBuffer, LowCardSubAggBuffer, SubAggBuffer,
|
||||
};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
@@ -503,17 +503,17 @@ struct DocCount {
|
||||
}
|
||||
|
||||
/// Segment collector for filter aggregation
|
||||
pub struct SegmentFilterCollector<C: SubAggCache> {
|
||||
pub struct SegmentFilterCollector<B: SubAggBuffer> {
|
||||
/// Document counts per parent bucket
|
||||
parent_buckets: Vec<DocCount>,
|
||||
/// Sub-aggregation collectors
|
||||
sub_aggregations: Option<CachedSubAggs<C>>,
|
||||
sub_aggregations: Option<BufferedSubAggs<B>>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
/// Accessor index for this filter aggregation (to access FilterAggReqData)
|
||||
accessor_idx: usize,
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> SegmentFilterCollector<C> {
|
||||
impl<B: SubAggBuffer> SegmentFilterCollector<B> {
|
||||
/// Create a new filter segment collector following the new agg_data pattern
|
||||
pub(crate) fn from_req_and_validate(
|
||||
req: &mut AggregationsSegmentCtx,
|
||||
@@ -525,7 +525,7 @@ impl<C: SubAggCache> SegmentFilterCollector<C> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let sub_agg_collector = sub_agg_collector.map(CachedSubAggs::new);
|
||||
let sub_agg_collector = sub_agg_collector.map(BufferedSubAggs::new);
|
||||
|
||||
Ok(SegmentFilterCollector {
|
||||
parent_buckets: Vec::new(),
|
||||
@@ -547,16 +547,16 @@ pub(crate) fn build_segment_filter_collector(
|
||||
|
||||
if is_top_level {
|
||||
Ok(Box::new(
|
||||
SegmentFilterCollector::<LowCardSubAggCache>::from_req_and_validate(req, node)?,
|
||||
SegmentFilterCollector::<LowCardSubAggBuffer>::from_req_and_validate(req, node)?,
|
||||
))
|
||||
} else {
|
||||
Ok(Box::new(
|
||||
SegmentFilterCollector::<HighCardSubAggCache>::from_req_and_validate(req, node)?,
|
||||
SegmentFilterCollector::<HighCardSubAggBuffer>::from_req_and_validate(req, node)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
|
||||
impl<B: SubAggBuffer> Debug for SegmentFilterCollector<B> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentFilterCollector")
|
||||
.field("buckets", &self.parent_buckets)
|
||||
@@ -566,7 +566,7 @@ impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> SegmentAggregationCollector for SegmentFilterCollector<C> {
|
||||
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentFilterCollector<B> {
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::aggregation::agg_data::{
|
||||
};
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::agg_result::BucketEntry;
|
||||
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs};
|
||||
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
IntermediateHistogramBucketEntry,
|
||||
@@ -258,7 +258,7 @@ pub(crate) struct SegmentHistogramBucketEntry {
|
||||
impl SegmentHistogramBucketEntry {
|
||||
pub(crate) fn into_intermediate_bucket_entry(
|
||||
self,
|
||||
sub_aggregation: &mut Option<HighCardCachedSubAggs>,
|
||||
sub_aggregation: &mut Option<HighCardBufferedSubAggs>,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<IntermediateHistogramBucketEntry> {
|
||||
let mut sub_aggregation_res = IntermediateAggregationResults::default();
|
||||
@@ -291,7 +291,7 @@ pub struct SegmentHistogramCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
/// One Histogram bucket per parent bucket id.
|
||||
parent_buckets: Vec<HistogramBuckets>,
|
||||
sub_agg: Option<HighCardCachedSubAggs>,
|
||||
sub_agg: Option<HighCardBufferedSubAggs>,
|
||||
accessor_idx: usize,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
}
|
||||
@@ -444,7 +444,7 @@ impl SegmentHistogramCollector {
|
||||
max: f64::MAX,
|
||||
});
|
||||
req_data.offset = req_data.req.offset.unwrap_or(0.0);
|
||||
let sub_agg = sub_agg.map(CachedSubAggs::new);
|
||||
let sub_agg = sub_agg.map(BufferedSubAggs::new);
|
||||
|
||||
Ok(Self {
|
||||
parent_buckets: Default::default(),
|
||||
|
||||
@@ -9,8 +9,9 @@ use crate::aggregation::agg_data::{
|
||||
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
|
||||
};
|
||||
use crate::aggregation::agg_limits::AggregationLimitsGuard;
|
||||
use crate::aggregation::cached_sub_aggs::{
|
||||
CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache,
|
||||
use crate::aggregation::buffered_sub_aggs::{
|
||||
BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer,
|
||||
SubAggBuffer,
|
||||
};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
@@ -155,13 +156,13 @@ pub(crate) struct SegmentRangeAndBucketEntry {
|
||||
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
pub struct SegmentRangeCollector<C: SubAggCache> {
|
||||
pub struct SegmentRangeCollector<B: SubAggBuffer> {
|
||||
/// The buckets containing the aggregation data.
|
||||
/// One for each ParentBucketId
|
||||
parent_buckets: Vec<Vec<SegmentRangeAndBucketEntry>>,
|
||||
column_type: ColumnType,
|
||||
pub(crate) accessor_idx: usize,
|
||||
sub_agg: Option<CachedSubAggs<C>>,
|
||||
sub_agg: Option<BufferedSubAggs<B>>,
|
||||
/// Here things get a bit weird. We need to assign unique bucket ids across all
|
||||
/// parent buckets. So we keep track of the next available bucket id here.
|
||||
/// This allows a kind of flattening of the bucket ids across all parent buckets.
|
||||
@@ -178,7 +179,7 @@ pub struct SegmentRangeCollector<C: SubAggCache> {
|
||||
limits: AggregationLimitsGuard,
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> Debug for SegmentRangeCollector<C> {
|
||||
impl<B: SubAggBuffer> Debug for SegmentRangeCollector<B> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentRangeCollector")
|
||||
.field("parent_buckets_len", &self.parent_buckets.len())
|
||||
@@ -229,7 +230,7 @@ impl SegmentRangeBucketEntry {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> SegmentAggregationCollector for SegmentRangeCollector<C> {
|
||||
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
@@ -350,8 +351,8 @@ pub(crate) fn build_segment_range_collector(
|
||||
};
|
||||
|
||||
if is_low_card {
|
||||
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggCache> {
|
||||
sub_agg: sub_agg.map(LowCardCachedSubAggs::new),
|
||||
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggBuffer> {
|
||||
sub_agg: sub_agg.map(LowCardBufferedSubAggs::new),
|
||||
column_type: field_type,
|
||||
accessor_idx,
|
||||
parent_buckets: Vec::new(),
|
||||
@@ -359,8 +360,8 @@ pub(crate) fn build_segment_range_collector(
|
||||
limits: agg_data.context.limits.clone(),
|
||||
}))
|
||||
} else {
|
||||
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggCache> {
|
||||
sub_agg: sub_agg.map(CachedSubAggs::new),
|
||||
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggBuffer> {
|
||||
sub_agg: sub_agg.map(BufferedSubAggs::new),
|
||||
column_type: field_type,
|
||||
accessor_idx,
|
||||
parent_buckets: Vec::new(),
|
||||
@@ -370,7 +371,7 @@ pub(crate) fn build_segment_range_collector(
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> SegmentRangeCollector<C> {
|
||||
impl<B: SubAggBuffer> SegmentRangeCollector<B> {
|
||||
pub(crate) fn create_new_buckets(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
@@ -554,7 +555,7 @@ mod tests {
|
||||
pub fn get_collector_from_ranges(
|
||||
ranges: Vec<RangeAggregationRange>,
|
||||
field_type: ColumnType,
|
||||
) -> SegmentRangeCollector<HighCardSubAggCache> {
|
||||
) -> SegmentRangeCollector<HighCardSubAggBuffer> {
|
||||
let req = RangeAggregation {
|
||||
field: "dummy".to_string(),
|
||||
ranges,
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::fmt::Debug;
|
||||
use std::io;
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
use columnar::column_values::CompactSpaceU64Accessor;
|
||||
@@ -17,8 +16,9 @@ use crate::aggregation::agg_data::{
|
||||
};
|
||||
use crate::aggregation::agg_limits::MemoryConsumption;
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::cached_sub_aggs::{
|
||||
CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache,
|
||||
use crate::aggregation::buffered_sub_aggs::{
|
||||
BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer,
|
||||
SubAggBuffer,
|
||||
};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
@@ -391,7 +391,7 @@ pub(crate) fn build_segment_term_collector(
|
||||
// Decide which bucket storage is best suited for this aggregation.
|
||||
if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations {
|
||||
let term_buckets = VecTermBucketsNoAgg::new(max_term_id + 1, &mut bucket_id_provider);
|
||||
let collector: SegmentTermCollector<_, HighCardSubAggCache> = SegmentTermCollector {
|
||||
let collector: SegmentTermCollector<_, HighCardSubAggBuffer> = SegmentTermCollector {
|
||||
parent_buckets: vec![term_buckets],
|
||||
sub_agg: None,
|
||||
bucket_id_provider,
|
||||
@@ -401,8 +401,8 @@ pub(crate) fn build_segment_term_collector(
|
||||
Ok(Box::new(collector))
|
||||
} else if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC {
|
||||
let term_buckets = VecTermBuckets::new(max_term_id + 1, &mut bucket_id_provider);
|
||||
let sub_agg = sub_agg_collector.map(LowCardCachedSubAggs::new);
|
||||
let collector: SegmentTermCollector<_, LowCardSubAggCache> = SegmentTermCollector {
|
||||
let sub_agg = sub_agg_collector.map(LowCardBufferedSubAggs::new);
|
||||
let collector: SegmentTermCollector<_, LowCardSubAggBuffer> = SegmentTermCollector {
|
||||
parent_buckets: vec![term_buckets],
|
||||
sub_agg,
|
||||
bucket_id_provider,
|
||||
@@ -414,8 +414,8 @@ pub(crate) fn build_segment_term_collector(
|
||||
let term_buckets: PagedTermMap =
|
||||
PagedTermMap::new(max_term_id + 1, &mut bucket_id_provider);
|
||||
// Build sub-aggregation blueprint (flat pairs)
|
||||
let sub_agg = sub_agg_collector.map(CachedSubAggs::new);
|
||||
let collector: SegmentTermCollector<PagedTermMap, HighCardSubAggCache> =
|
||||
let sub_agg = sub_agg_collector.map(BufferedSubAggs::new);
|
||||
let collector: SegmentTermCollector<PagedTermMap, HighCardSubAggBuffer> =
|
||||
SegmentTermCollector {
|
||||
parent_buckets: vec![term_buckets],
|
||||
sub_agg,
|
||||
@@ -427,8 +427,8 @@ pub(crate) fn build_segment_term_collector(
|
||||
} else {
|
||||
let term_buckets: HashMapTermBuckets = HashMapTermBuckets::default();
|
||||
// Build sub-aggregation blueprint (flat pairs)
|
||||
let sub_agg = sub_agg_collector.map(CachedSubAggs::new);
|
||||
let collector: SegmentTermCollector<HashMapTermBuckets, HighCardSubAggCache> =
|
||||
let sub_agg = sub_agg_collector.map(BufferedSubAggs::new);
|
||||
let collector: SegmentTermCollector<HashMapTermBuckets, HighCardSubAggBuffer> =
|
||||
SegmentTermCollector {
|
||||
parent_buckets: vec![term_buckets],
|
||||
sub_agg,
|
||||
@@ -758,10 +758,10 @@ impl TermAggregationMap for VecTermBuckets {
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
#[derive(Debug)]
|
||||
struct SegmentTermCollector<TermMap: TermAggregationMap, C: SubAggCache> {
|
||||
struct SegmentTermCollector<TermMap: TermAggregationMap, B: SubAggBuffer> {
|
||||
/// The buckets containing the aggregation data.
|
||||
parent_buckets: Vec<TermMap>,
|
||||
sub_agg: Option<CachedSubAggs<C>>,
|
||||
sub_agg: Option<BufferedSubAggs<B>>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
max_term_id: u64,
|
||||
terms_req_data: TermsAggReqData,
|
||||
@@ -772,8 +772,8 @@ pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
|
||||
(agg_name, agg_property)
|
||||
}
|
||||
|
||||
impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
|
||||
for SegmentTermCollector<TermMap, C>
|
||||
impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentAggregationCollector
|
||||
for SegmentTermCollector<TermMap, B>
|
||||
{
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
@@ -790,8 +790,14 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
|
||||
let term_req = &self.terms_req_data;
|
||||
let name = term_req.name.clone();
|
||||
|
||||
let bucket =
|
||||
Self::into_intermediate_bucket_result(term_req, &mut self.sub_agg, bucket, agg_data)?;
|
||||
let bucket = Self::into_intermediate_bucket_result(
|
||||
term_req,
|
||||
self.sub_agg
|
||||
.as_mut()
|
||||
.map(BufferedSubAggs::get_sub_agg_collector),
|
||||
bucket,
|
||||
agg_data,
|
||||
)?;
|
||||
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -803,7 +809,7 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
|
||||
docs: &[crate::DocId],
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
let mem_pre = self.get_memory_consumption();
|
||||
// let mem_pre = self.get_memory_consumption();
|
||||
|
||||
let req_data = &mut self.terms_req_data;
|
||||
|
||||
@@ -847,13 +853,16 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
|
||||
}
|
||||
}
|
||||
|
||||
let mem_delta = self.get_memory_consumption() - mem_pre;
|
||||
if mem_delta > 0 {
|
||||
agg_data
|
||||
.context
|
||||
.limits
|
||||
.add_memory_consumed(mem_delta as u64)?;
|
||||
}
|
||||
// let mem_delta = self.get_memory_consumption() - mem_pre;
|
||||
// if mem_delta > 0 {
|
||||
// agg_data
|
||||
// .context
|
||||
// .limits
|
||||
// .add_memory_consumed(mem_delta as u64)?;
|
||||
// }
|
||||
|
||||
// After commenting out -> 6000ms -> 36ms
|
||||
|
||||
if let Some(sub_agg) = &mut self.sub_agg {
|
||||
sub_agg.check_flush_local(agg_data)?;
|
||||
}
|
||||
@@ -907,10 +916,38 @@ fn extract_missing_value<T>(
|
||||
Some((key, bucket))
|
||||
}
|
||||
|
||||
impl<TermMap, C> SegmentTermCollector<TermMap, C>
|
||||
fn reborrow_opt_collector<'a>(
|
||||
opt: &'a mut Option<&mut dyn SegmentAggregationCollector>,
|
||||
) -> Option<&'a mut dyn SegmentAggregationCollector> {
|
||||
match opt {
|
||||
Some(inner) => Some(*inner),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn into_intermediate_bucket_entry(
|
||||
bucket: Bucket,
|
||||
sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<IntermediateTermBucketEntry> {
|
||||
let mut sub_aggregation_res = IntermediateAggregationResults::default();
|
||||
if let Some(sub_agg_collector) = sub_agg_collector {
|
||||
sub_agg_collector.add_intermediate_aggregation_result(
|
||||
agg_data,
|
||||
&mut sub_aggregation_res,
|
||||
bucket.bucket_id,
|
||||
)?;
|
||||
}
|
||||
Ok(IntermediateTermBucketEntry {
|
||||
doc_count: bucket.count,
|
||||
sub_aggregation: sub_aggregation_res,
|
||||
})
|
||||
}
|
||||
|
||||
impl<TermMap, B> SegmentTermCollector<TermMap, B>
|
||||
where
|
||||
TermMap: TermAggregationMap,
|
||||
C: SubAggCache,
|
||||
B: SubAggBuffer,
|
||||
{
|
||||
fn get_memory_consumption(&self) -> usize {
|
||||
self.parent_buckets
|
||||
@@ -922,7 +959,7 @@ where
|
||||
#[inline]
|
||||
pub(crate) fn into_intermediate_bucket_result(
|
||||
term_req: &TermsAggReqData,
|
||||
sub_agg: &mut Option<CachedSubAggs<C>>,
|
||||
mut sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>,
|
||||
term_buckets: TermMap,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<IntermediateBucketResult> {
|
||||
@@ -965,31 +1002,6 @@ where
|
||||
let mut dict: FxHashMap<IntermediateKey, IntermediateTermBucketEntry> = Default::default();
|
||||
dict.reserve(entries.len());
|
||||
|
||||
let into_intermediate_bucket_entry =
|
||||
|bucket: Bucket,
|
||||
sub_agg: &mut Option<CachedSubAggs<C>>|
|
||||
-> crate::Result<IntermediateTermBucketEntry> {
|
||||
if let Some(sub_agg) = sub_agg {
|
||||
let mut sub_aggregation_res = IntermediateAggregationResults::default();
|
||||
sub_agg
|
||||
.get_sub_agg_collector()
|
||||
.add_intermediate_aggregation_result(
|
||||
agg_data,
|
||||
&mut sub_aggregation_res,
|
||||
bucket.bucket_id,
|
||||
)?;
|
||||
Ok(IntermediateTermBucketEntry {
|
||||
doc_count: bucket.count,
|
||||
sub_aggregation: sub_aggregation_res,
|
||||
})
|
||||
} else {
|
||||
Ok(IntermediateTermBucketEntry {
|
||||
doc_count: bucket.count,
|
||||
sub_aggregation: Default::default(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
if term_req.column_type == ColumnType::Str {
|
||||
let fallback_dict = Dictionary::empty();
|
||||
let term_dict = term_req
|
||||
@@ -1000,7 +1012,11 @@ where
|
||||
|
||||
if let Some((intermediate_key, bucket)) = extract_missing_value(&mut entries, term_req)
|
||||
{
|
||||
let intermediate_entry = into_intermediate_bucket_entry(bucket, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
bucket,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
dict.insert(intermediate_key, intermediate_entry);
|
||||
}
|
||||
|
||||
@@ -1008,19 +1024,28 @@ where
|
||||
entries.sort_unstable_by_key(|bucket| bucket.0);
|
||||
|
||||
let (term_ids, buckets): (Vec<u64>, Vec<Bucket>) = entries.into_iter().unzip();
|
||||
let mut buckets_it = buckets.into_iter();
|
||||
|
||||
term_dict.sorted_ords_to_term_cb(term_ids.into_iter(), |term| {
|
||||
let bucket = buckets_it.next().unwrap();
|
||||
let intermediate_entry =
|
||||
into_intermediate_bucket_entry(bucket, sub_agg).map_err(io::Error::other)?;
|
||||
let intermediate_entries: Vec<IntermediateTermBucketEntry> = buckets
|
||||
.into_iter()
|
||||
.map(|bucket| {
|
||||
into_intermediate_bucket_entry(
|
||||
bucket,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)
|
||||
})
|
||||
.collect::<crate::Result<_>>()?;
|
||||
|
||||
let mut intermediate_entry_it = intermediate_entries.into_iter();
|
||||
|
||||
term_dict.sorted_ords_to_term_cb(&term_ids[..], |term| {
|
||||
let intermediate_entry = intermediate_entry_it.next().unwrap();
|
||||
dict.insert(
|
||||
IntermediateKey::Str(
|
||||
String::from_utf8(term.to_vec()).expect("could not convert to String"),
|
||||
),
|
||||
intermediate_entry,
|
||||
);
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
if term_req.req.min_doc_count == 0 {
|
||||
@@ -1055,14 +1080,22 @@ where
|
||||
}
|
||||
} else if term_req.column_type == ColumnType::DateTime {
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
doc_count,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
let val = i64::from_u64(val);
|
||||
let date = format_date(val)?;
|
||||
dict.insert(IntermediateKey::Str(date), intermediate_entry);
|
||||
}
|
||||
} else if term_req.column_type == ColumnType::Bool {
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
doc_count,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
let val = bool::from_u64(val);
|
||||
dict.insert(IntermediateKey::Bool(val), intermediate_entry);
|
||||
}
|
||||
@@ -1082,14 +1115,22 @@ where
|
||||
})?;
|
||||
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
doc_count,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
|
||||
let val = Ipv6Addr::from_u128(val);
|
||||
dict.insert(IntermediateKey::IpAddr(val), intermediate_entry);
|
||||
}
|
||||
} else {
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
doc_count,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
if term_req.column_type == ColumnType::U64 {
|
||||
dict.insert(IntermediateKey::U64(val), intermediate_entry);
|
||||
} else if term_req.column_type == ColumnType::I64 {
|
||||
@@ -1123,13 +1164,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentTermCollector<TermMap, C> {
|
||||
impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentTermCollector<TermMap, B> {
|
||||
#[inline]
|
||||
fn collect_terms_with_docs(
|
||||
iter: impl Iterator<Item = (crate::DocId, u64)>,
|
||||
term_buckets: &mut TermMap,
|
||||
bucket_id_provider: &mut BucketIdProvider,
|
||||
sub_agg: &mut CachedSubAggs<C>,
|
||||
sub_agg: &mut BufferedSubAggs<B>,
|
||||
) {
|
||||
for (doc, term_id) in iter {
|
||||
let bucket_id = term_buckets.term_entry(term_id, bucket_id_provider);
|
||||
@@ -1187,6 +1228,7 @@ pub(crate) fn cut_off_buckets<T: GetDocCount + Debug>(
|
||||
mod tests {
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
use std::time::Instant;
|
||||
|
||||
use common::DateTime;
|
||||
use time::{Date, Month};
|
||||
@@ -1200,8 +1242,9 @@ mod tests {
|
||||
get_test_index_from_terms, get_test_index_from_values_and_terms,
|
||||
};
|
||||
use crate::aggregation::{AggregationLimitsGuard, DistributedAggregationCollector};
|
||||
use crate::collector::{Collector, default_collect_segment_impl};
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::AllQuery;
|
||||
use crate::query::{AllQuery, EnableScoring, Query};
|
||||
use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
|
||||
use crate::{Index, IndexWriter};
|
||||
|
||||
@@ -2896,4 +2939,103 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_terms_double_nesting() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST);
|
||||
let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
|
||||
let outer_values = (0..10_000)
|
||||
.map(|i| format!("outer_{i}"))
|
||||
.collect::<Vec<_>>();
|
||||
let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"];
|
||||
|
||||
{
|
||||
let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 200_000_000).unwrap();
|
||||
for doc_id in 0..1_000_000u64 {
|
||||
let outer_val = &outer_values[doc_id as usize % outer_values.len()];
|
||||
let inner_val = inner_values[doc_id as usize % inner_values.len()];
|
||||
index_writer.add_document(doc!(
|
||||
outer_field => outer_val.as_str(),
|
||||
inner_field => inner_val,
|
||||
)).unwrap();
|
||||
}
|
||||
index_writer.commit().unwrap();
|
||||
}
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"outer": {
|
||||
"terms": { "field": "outer_term", "size": 10 },
|
||||
"aggs": {
|
||||
"inner": {
|
||||
"terms": { "field": "inner_term" }
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
|
||||
let collector =
|
||||
crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default());
|
||||
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
let segment_reader = searcher.segment_reader(0u32);
|
||||
let all_weight = AllQuery.weight(EnableScoring::disabled_from_schema(&schema)).unwrap();
|
||||
let mut segment_collector = collector.for_segment(0u32, segment_reader).unwrap();
|
||||
let start = Instant::now();
|
||||
default_collect_segment_impl(&mut segment_collector, &*all_weight, segment_reader, false).unwrap();
|
||||
dbg!(start.elapsed());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_terms_simple_nesting() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST);
|
||||
let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
|
||||
let outer_values = (0..10_000)
|
||||
.map(|i| format!("outer_{i}"))
|
||||
.collect::<Vec<_>>();
|
||||
let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"];
|
||||
|
||||
{
|
||||
let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 200_000_000).unwrap();
|
||||
for doc_id in 0..1_000_000u64 {
|
||||
let outer_val = &outer_values[doc_id as usize % outer_values.len()];
|
||||
let inner_val = inner_values[doc_id as usize % inner_values.len()];
|
||||
index_writer.add_document(doc!(
|
||||
outer_field => outer_val.as_str(),
|
||||
inner_field => inner_val,
|
||||
)).unwrap();
|
||||
}
|
||||
index_writer.commit().unwrap();
|
||||
}
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"outer": {
|
||||
"terms": { "field": "outer_term", "size": 10 },
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
|
||||
let collector =
|
||||
crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default());
|
||||
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
let segment_reader = searcher.segment_reader(0u32);
|
||||
let all_weight = AllQuery.weight(EnableScoring::disabled_from_schema(&schema)).unwrap();
|
||||
let mut segment_collector = collector.for_segment(0u32, segment_reader).unwrap();
|
||||
let start = Instant::now();
|
||||
default_collect_segment_impl(&mut segment_collector, &*all_weight, segment_reader, false).unwrap();
|
||||
dbg!(start.elapsed());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::aggregation::agg_data::{
|
||||
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
|
||||
};
|
||||
use crate::aggregation::bucket::term_agg::TermsAggregation;
|
||||
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs};
|
||||
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
IntermediateKey, IntermediateTermBucketEntry, IntermediateTermBucketResult,
|
||||
@@ -47,7 +47,7 @@ struct MissingCount {
|
||||
#[derive(Default, Debug)]
|
||||
pub struct TermMissingAgg {
|
||||
accessor_idx: usize,
|
||||
sub_agg: Option<HighCardCachedSubAggs>,
|
||||
sub_agg: Option<HighCardBufferedSubAggs>,
|
||||
/// Idx = parent bucket id, Value = missing count for that bucket
|
||||
missing_count_per_bucket: Vec<MissingCount>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
@@ -66,7 +66,7 @@ impl TermMissingAgg {
|
||||
None
|
||||
};
|
||||
|
||||
let sub_agg = sub_agg.map(CachedSubAggs::new);
|
||||
let sub_agg = sub_agg.map(BufferedSubAggs::new);
|
||||
let bucket_id_provider = BucketIdProvider::default();
|
||||
|
||||
Ok(Self {
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::aggregation::bucket::MAX_NUM_TERMS_FOR_VEC;
|
||||
use crate::aggregation::BucketId;
|
||||
use crate::DocId;
|
||||
|
||||
/// A cache for sub-aggregations, storing doc ids per bucket id.
|
||||
/// A buffer for sub-aggregations, storing doc ids per bucket id.
|
||||
/// Depending on the cardinality of the parent aggregation, we use different
|
||||
/// storage strategies.
|
||||
///
|
||||
@@ -24,21 +24,21 @@ use crate::DocId;
|
||||
/// aggregations.
|
||||
/// What this datastructure does in general is to group docs by bucket id.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CachedSubAggs<C: SubAggCache> {
|
||||
cache: C,
|
||||
pub(crate) struct BufferedSubAggs<B: SubAggBuffer> {
|
||||
buffer: B,
|
||||
sub_agg_collector: Box<dyn SegmentAggregationCollector>,
|
||||
num_docs: usize,
|
||||
}
|
||||
|
||||
pub type LowCardCachedSubAggs = CachedSubAggs<LowCardSubAggCache>;
|
||||
pub type HighCardCachedSubAggs = CachedSubAggs<HighCardSubAggCache>;
|
||||
pub type LowCardBufferedSubAggs = BufferedSubAggs<LowCardSubAggBuffer>;
|
||||
pub type HighCardBufferedSubAggs = BufferedSubAggs<HighCardSubAggBuffer>;
|
||||
|
||||
const FLUSH_THRESHOLD: usize = 2048;
|
||||
|
||||
/// A trait for caching sub-aggregation doc ids per bucket id.
|
||||
/// A trait for buffering sub-aggregation doc ids per bucket id.
|
||||
/// Different implementations can be used depending on the cardinality
|
||||
/// of the parent aggregation.
|
||||
pub trait SubAggCache: Debug {
|
||||
pub trait SubAggBuffer: Debug {
|
||||
fn new() -> Self;
|
||||
fn push(&mut self, bucket_id: BucketId, doc_id: DocId);
|
||||
fn flush_local(
|
||||
@@ -49,22 +49,22 @@ pub trait SubAggCache: Debug {
|
||||
) -> crate::Result<()>;
|
||||
}
|
||||
|
||||
impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
|
||||
impl<Backend: SubAggBuffer + Debug> BufferedSubAggs<Backend> {
|
||||
pub fn new(sub_agg: Box<dyn SegmentAggregationCollector>) -> Self {
|
||||
Self {
|
||||
cache: Backend::new(),
|
||||
buffer: Backend::new(),
|
||||
sub_agg_collector: sub_agg,
|
||||
num_docs: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_sub_agg_collector(&mut self) -> &mut Box<dyn SegmentAggregationCollector> {
|
||||
&mut self.sub_agg_collector
|
||||
pub fn get_sub_agg_collector(&mut self) -> &mut dyn SegmentAggregationCollector {
|
||||
&mut *self.sub_agg_collector
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn push(&mut self, bucket_id: BucketId, doc_id: DocId) {
|
||||
self.cache.push(bucket_id, doc_id);
|
||||
self.buffer.push(bucket_id, doc_id);
|
||||
self.num_docs += 1;
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
if self.num_docs >= FLUSH_THRESHOLD {
|
||||
self.cache
|
||||
self.buffer
|
||||
.flush_local(&mut self.sub_agg_collector, agg_data, false)?;
|
||||
self.num_docs = 0;
|
||||
}
|
||||
@@ -85,7 +85,7 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
|
||||
/// Note: this _does_ flush the sub aggregations.
|
||||
pub fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
|
||||
if self.num_docs != 0 {
|
||||
self.cache
|
||||
self.buffer
|
||||
.flush_local(&mut self.sub_agg_collector, agg_data, true)?;
|
||||
self.num_docs = 0;
|
||||
}
|
||||
@@ -94,11 +94,11 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of partitions for high cardinality sub-aggregation cache.
|
||||
/// Number of partitions for high cardinality sub-aggregation buffer.
|
||||
const NUM_PARTITIONS: usize = 16;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct HighCardSubAggCache {
|
||||
pub(crate) struct HighCardSubAggBuffer {
|
||||
/// This weird partitioning is used to do some cheap grouping on the bucket ids.
|
||||
/// bucket ids are dense, e.g. when we don't detect the cardinality as low cardinality,
|
||||
/// but there are just 16 bucket ids, each bucket id will go to its own partition.
|
||||
@@ -108,7 +108,7 @@ pub(crate) struct HighCardSubAggCache {
|
||||
partitions: Box<[PartitionEntry; NUM_PARTITIONS]>,
|
||||
}
|
||||
|
||||
impl HighCardSubAggCache {
|
||||
impl HighCardSubAggBuffer {
|
||||
#[inline]
|
||||
fn clear(&mut self) {
|
||||
for partition in self.partitions.iter_mut() {
|
||||
@@ -131,7 +131,7 @@ impl PartitionEntry {
|
||||
}
|
||||
}
|
||||
|
||||
impl SubAggCache for HighCardSubAggCache {
|
||||
impl SubAggBuffer for HighCardSubAggBuffer {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
partitions: Box::new(core::array::from_fn(|_| PartitionEntry::default())),
|
||||
@@ -173,14 +173,14 @@ impl SubAggCache for HighCardSubAggCache {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct LowCardSubAggCache {
|
||||
/// Cache doc ids per bucket for sub-aggregations.
|
||||
pub(crate) struct LowCardSubAggBuffer {
|
||||
/// Buffer doc ids per bucket for sub-aggregations.
|
||||
///
|
||||
/// The outer Vec is indexed by BucketId.
|
||||
per_bucket_docs: Vec<Vec<DocId>>,
|
||||
}
|
||||
|
||||
impl LowCardSubAggCache {
|
||||
impl LowCardSubAggBuffer {
|
||||
#[inline]
|
||||
fn clear(&mut self) {
|
||||
for v in &mut self.per_bucket_docs {
|
||||
@@ -189,7 +189,7 @@ impl LowCardSubAggCache {
|
||||
}
|
||||
}
|
||||
|
||||
impl SubAggCache for LowCardSubAggCache {
|
||||
impl SubAggBuffer for LowCardSubAggBuffer {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
per_bucket_docs: Vec::new(),
|
||||
@@ -1,6 +1,6 @@
|
||||
use super::agg_req::Aggregations;
|
||||
use super::agg_result::AggregationResults;
|
||||
use super::cached_sub_aggs::LowCardCachedSubAggs;
|
||||
use super::buffered_sub_aggs::LowCardBufferedSubAggs;
|
||||
use super::intermediate_agg_result::IntermediateAggregationResults;
|
||||
use super::AggContextParams;
|
||||
// group buffering strategy is chosen explicitly by callers; no need to hash-group on the fly.
|
||||
@@ -136,7 +136,7 @@ fn merge_fruits(
|
||||
/// `AggregationSegmentCollector` does the aggregation collection on a segment.
|
||||
pub struct AggregationSegmentCollector {
|
||||
aggs_with_accessor: AggregationsSegmentCtx,
|
||||
agg_collector: LowCardCachedSubAggs,
|
||||
agg_collector: LowCardBufferedSubAggs,
|
||||
error: Option<TantivyError>,
|
||||
}
|
||||
|
||||
@@ -152,7 +152,7 @@ impl AggregationSegmentCollector {
|
||||
let mut agg_data =
|
||||
build_aggregations_data_from_req(agg, reader, segment_ordinal, context.clone())?;
|
||||
let mut result =
|
||||
LowCardCachedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?);
|
||||
LowCardBufferedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?);
|
||||
result
|
||||
.get_sub_agg_collector()
|
||||
.prepare_max_bucket(0, &agg_data)?; // prepare for bucket zero
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
use std::io;
|
||||
|
||||
use columnar::column_values::CompactSpaceU64Accessor;
|
||||
use columnar::{Column, ColumnType, Dictionary, StrColumn};
|
||||
use common::f64_to_u64;
|
||||
use datasketches::hll::{HllSketch, HllType, HllUnion};
|
||||
use rustc_hash::FxHashSet;
|
||||
use datasketches::hll::{Coupon, HllSketch, HllType, HllUnion};
|
||||
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
use crate::aggregation::agg_data::AggregationsSegmentCtx;
|
||||
@@ -120,9 +121,65 @@ impl CardinalityAggregationReq {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
/// A CouponCache is here to cache the mapping term ordinal -> coupon (see above).
|
||||
/// The idea is that we do not want to fetch terms associated to several term ordinals,
|
||||
/// several times due to the fact that we have several buckets.
|
||||
enum CouponCache {
|
||||
Dense {
|
||||
coupon_map: Vec<Coupon>,
|
||||
missing_coupon_opt: Option<Coupon>,
|
||||
},
|
||||
Sparse {
|
||||
coupon_map: FxHashMap<u64, Coupon>,
|
||||
missing_coupon_opt: Option<Coupon>,
|
||||
},
|
||||
}
|
||||
|
||||
impl CouponCache {
|
||||
fn new(
|
||||
term_ords: Vec<u64>,
|
||||
coupons: Vec<Coupon>,
|
||||
missing_coupon_opt: Option<Coupon>,
|
||||
) -> CouponCache {
|
||||
let num_terms = term_ords.len();
|
||||
assert_eq!(num_terms, coupons.len());
|
||||
if term_ords.is_empty() {
|
||||
return CouponCache::Dense {
|
||||
coupon_map: Vec::new(),
|
||||
missing_coupon_opt,
|
||||
};
|
||||
}
|
||||
let highest_term_ord = term_ords.last().copied().unwrap_or(0u64);
|
||||
// We prefer the dense implementation, if it is not too wasteful.
|
||||
// There are two cases for which we can use it.
|
||||
// 1- if the data is small.
|
||||
// 2- if the data is not necessarily small, but due to a high occupancy ratio, the RAM usage
|
||||
// is not that much bigger than if we had used a HashSet. (occupancy ratio + extra
|
||||
// metadata ~ x2.25)
|
||||
let should_use_dense =
|
||||
highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64;
|
||||
if should_use_dense {
|
||||
let mut coupon_map: Vec<Coupon> = vec![Coupon::EMPTY; highest_term_ord as usize + 1];
|
||||
for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) {
|
||||
coupon_map[term_ord as usize] = coupon;
|
||||
}
|
||||
CouponCache::Dense {
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
}
|
||||
} else {
|
||||
let coupon_map: FxHashMap<u64, Coupon> = term_ords.into_iter().zip(coupons).collect();
|
||||
CouponCache::Sparse {
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SegmentCardinalityCollector {
|
||||
buckets: Vec<SegmentCardinalityCollectorBucket>,
|
||||
/// Buckets are Some(_) until they get consumed by into_intermediate_results().
|
||||
buckets: Vec<Option<SegmentCardinalityCollectorBucket>>,
|
||||
accessor_idx: usize,
|
||||
/// The column accessor to access the fast field values.
|
||||
accessor: Column<u64>,
|
||||
@@ -130,75 +187,133 @@ pub(crate) struct SegmentCardinalityCollector {
|
||||
column_type: ColumnType,
|
||||
/// The missing value normalized to the internal u64 representation of the field type.
|
||||
missing_value_for_accessor: Option<u64>,
|
||||
coupon_cache: Option<CouponCache>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentCardinalityCollector {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentCardinalityCollector")
|
||||
.field("column_type", &self.column_type)
|
||||
.field(
|
||||
"missing_value_for_accessor",
|
||||
&self.missing_value_for_accessor,
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Default)]
|
||||
pub(crate) struct SegmentCardinalityCollectorBucket {
|
||||
cardinality: CardinalityCollector,
|
||||
entries: FxHashSet<u64>,
|
||||
}
|
||||
impl SegmentCardinalityCollectorBucket {
|
||||
#[inline(always)]
|
||||
pub fn new(column_type: ColumnType) -> Self {
|
||||
Self {
|
||||
cardinality: CardinalityCollector::new(column_type as u8),
|
||||
entries: FxHashSet::default(),
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a intermediate metric result.
|
||||
//
|
||||
// If the column is not str, the values have been added to the
|
||||
// sketch during collection.
|
||||
//
|
||||
// If the column is str, then the values are dictionary encoded
|
||||
// and have not been added to the sketch yet.
|
||||
// We need to resolves the term ords accumulated in self.entries
|
||||
// with the coupon cache, and append the results to the sketch.
|
||||
fn into_intermediate_metric_result(
|
||||
mut self,
|
||||
req_data: &CardinalityAggReqData,
|
||||
coupon_cache_opt: Option<&CouponCache>,
|
||||
) -> crate::Result<IntermediateMetricResult> {
|
||||
if req_data.column_type == ColumnType::Str {
|
||||
let fallback_dict = Dictionary::empty();
|
||||
let dict = req_data
|
||||
.str_dict_column
|
||||
.as_ref()
|
||||
.map(|el| el.dictionary())
|
||||
.unwrap_or_else(|| &fallback_dict);
|
||||
let mut has_missing = false;
|
||||
if let Some(coupon_cache) = coupon_cache_opt {
|
||||
assert!(self.cardinality.sketch.is_empty());
|
||||
append_to_sketch(&self.entries, coupon_cache, &mut self.cardinality);
|
||||
}
|
||||
Ok(IntermediateMetricResult::Cardinality(self.cardinality))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: replace FxHashSet with something that allows iterating in order
|
||||
// (e.g. sparse bitvec)
|
||||
let mut term_ids = Vec::new();
|
||||
for term_ord in self.entries.into_iter() {
|
||||
if term_ord == u64::MAX {
|
||||
has_missing = true;
|
||||
} else {
|
||||
// we can reasonably exclude values above u32::MAX
|
||||
term_ids.push(term_ord as u32);
|
||||
}
|
||||
}
|
||||
/// Builds a coupon cache from the given buckets, dictionary, and optional missing value.
|
||||
/// Returns a mapping from term_ord to the hash (coupon) of the associated term.
|
||||
fn build_coupon_cache(
|
||||
buckets: &[Option<SegmentCardinalityCollectorBucket>],
|
||||
dictionary: &Dictionary,
|
||||
missing_value_opt: Option<&Key>,
|
||||
) -> io::Result<CouponCache> {
|
||||
let term_ords_capacity: usize = buckets
|
||||
.iter()
|
||||
.flatten()
|
||||
.map(|bucket| bucket.entries.len())
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
* 2;
|
||||
let mut term_ords_set = FxHashSet::with_capacity_and_hasher(term_ords_capacity, FxBuildHasher);
|
||||
for bucket in buckets.iter().flatten() {
|
||||
term_ords_set.extend(bucket.entries.iter().copied());
|
||||
}
|
||||
let mut term_ords: Vec<u64> = term_ords_set.into_iter().collect();
|
||||
term_ords.sort_unstable();
|
||||
|
||||
term_ids.sort_unstable();
|
||||
dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| {
|
||||
self.cardinality.insert(term);
|
||||
Ok(())
|
||||
})?;
|
||||
if has_missing {
|
||||
// Replace missing with the actual value provided
|
||||
let missing_key =
|
||||
req_data.req.missing.as_ref().expect(
|
||||
"Found sentinel value u64::MAX for term_ord but `missing` is not set",
|
||||
);
|
||||
match missing_key {
|
||||
Key::Str(missing) => {
|
||||
self.cardinality.insert(missing.as_str());
|
||||
}
|
||||
Key::F64(val) => {
|
||||
let val = f64_to_u64(*val);
|
||||
self.cardinality.insert(val);
|
||||
}
|
||||
Key::U64(val) => {
|
||||
self.cardinality.insert(*val);
|
||||
}
|
||||
Key::I64(val) => {
|
||||
self.cardinality.insert(*val);
|
||||
}
|
||||
term_ords.pop_if(|highest_term_ord| *highest_term_ord >= dictionary.num_terms() as u64);
|
||||
|
||||
let mut coupons: Vec<Coupon> = Vec::with_capacity(term_ords.len());
|
||||
let all_term_ords_found: bool =
|
||||
dictionary.sorted_ords_to_term_cb(&term_ords, |term_bytes| {
|
||||
let coupon: Coupon = Coupon::from_hash(term_bytes);
|
||||
coupons.push(coupon);
|
||||
})?;
|
||||
assert!(all_term_ords_found);
|
||||
|
||||
// Regardless of whether or not there is effectively a missing value in one of the buckets,
|
||||
// we populate the cache with the missing key too (if any).
|
||||
let missing_coupon_opt: Option<Coupon> = missing_value_opt.map(|missing_key| {
|
||||
if let Key::Str(missing_value_str) = missing_key {
|
||||
Coupon::from_hash(missing_value_str.as_bytes())
|
||||
} else {
|
||||
// See https://github.com/quickwit-oss/tantivy/issues/2891
|
||||
// A missing key with a type different from Str will not work as intended
|
||||
// for the moment.
|
||||
//
|
||||
// Right now this is just a partial workaround.
|
||||
Coupon::from_hash("__tantivy_missing_non_str__".as_bytes())
|
||||
}
|
||||
});
|
||||
Ok(CouponCache::new(term_ords, coupons, missing_coupon_opt))
|
||||
}
|
||||
|
||||
fn append_to_sketch(
|
||||
term_ords: &FxHashSet<u64>,
|
||||
coupon_cache: &CouponCache,
|
||||
sketch: &mut CardinalityCollector,
|
||||
) {
|
||||
match coupon_cache {
|
||||
CouponCache::Dense {
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
} => {
|
||||
for &term_ord in term_ords {
|
||||
if let Some(coupon) = coupon_map
|
||||
.get(term_ord as usize)
|
||||
.copied()
|
||||
.or(*missing_coupon_opt)
|
||||
{
|
||||
sketch.insert_coupon(coupon);
|
||||
}
|
||||
}
|
||||
}
|
||||
CouponCache::Sparse {
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
} => {
|
||||
for term_ord in term_ords {
|
||||
if let Some(coupon) = coupon_map.get(term_ord).copied().or(*missing_coupon_opt) {
|
||||
sketch.insert_coupon(coupon);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(IntermediateMetricResult::Cardinality(self.cardinality))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,11 +325,12 @@ impl SegmentCardinalityCollector {
|
||||
missing_value_for_accessor: Option<u64>,
|
||||
) -> Self {
|
||||
Self {
|
||||
buckets: vec![SegmentCardinalityCollectorBucket::new(column_type); 1],
|
||||
buckets: Vec::new(),
|
||||
column_type,
|
||||
accessor_idx,
|
||||
accessor,
|
||||
missing_value_for_accessor,
|
||||
coupon_cache: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,15 +352,35 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
results: &mut IntermediateAggregationResults,
|
||||
parent_bucket_id: BucketId,
|
||||
bucket_id: BucketId,
|
||||
) -> crate::Result<()> {
|
||||
self.prepare_max_bucket(parent_bucket_id, agg_data)?;
|
||||
self.prepare_max_bucket(bucket_id, agg_data)?;
|
||||
let req_data = &agg_data.get_cardinality_req_data(self.accessor_idx);
|
||||
// Strings are dictionary encoded. Fetching the terms associated to strings
|
||||
// is expensive. For this reason, we do that once for all buckets and cache the results
|
||||
// here.
|
||||
if let Some(str_dict_column) = &req_data.str_dict_column {
|
||||
// Ensure the coupon cache is populated.
|
||||
// A mapping from term_ord to the hash of the associated term.
|
||||
// The missing value sentinel will be associated to the hash of the missing value if
|
||||
// any.
|
||||
if self.coupon_cache.is_none() {
|
||||
self.coupon_cache = Some(build_coupon_cache(
|
||||
&self.buckets,
|
||||
str_dict_column.dictionary(),
|
||||
req_data.req.missing.as_ref(),
|
||||
)?);
|
||||
}
|
||||
}
|
||||
let name = req_data.name.to_string();
|
||||
// take the bucket in buckets and replace it with a new empty one
|
||||
let bucket = std::mem::take(&mut self.buckets[parent_bucket_id as usize]);
|
||||
|
||||
let intermediate_result = bucket.into_intermediate_metric_result(req_data)?;
|
||||
let Some(bucket) = self.buckets[bucket_id as usize].take() else {
|
||||
return Err(crate::TantivyError::InternalError(
|
||||
"the same bucket should not be finalized twice.".to_string(),
|
||||
));
|
||||
};
|
||||
let intermediate_result =
|
||||
bucket.into_intermediate_metric_result(self.coupon_cache.as_ref())?;
|
||||
results.push(
|
||||
name,
|
||||
IntermediateAggregationResult::Metric(intermediate_result),
|
||||
@@ -260,8 +396,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
self.fetch_block_with_field(docs, agg_data);
|
||||
let bucket = &mut self.buckets[parent_bucket_id as usize];
|
||||
|
||||
let Some(bucket) = &mut self.buckets[parent_bucket_id as usize].as_mut() else {
|
||||
return Err(crate::TantivyError::InternalError(
|
||||
"collection should not happen after finalization".to_string(),
|
||||
));
|
||||
};
|
||||
let col_block_accessor = &agg_data.column_block_accessor;
|
||||
if self.column_type == ColumnType::Str {
|
||||
for term_ord in col_block_accessor.iter_vals() {
|
||||
@@ -301,7 +440,7 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
) -> crate::Result<()> {
|
||||
if max_bucket as usize >= self.buckets.len() {
|
||||
self.buckets.resize_with(max_bucket as usize + 1, || {
|
||||
SegmentCardinalityCollectorBucket::new(self.column_type)
|
||||
Some(SegmentCardinalityCollectorBucket::new(self.column_type))
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
@@ -358,10 +497,14 @@ impl CardinalityCollector {
|
||||
/// Insert a value into the HLL sketch, salted by the column type.
|
||||
/// The salt ensures that identical u64 values from different column types
|
||||
/// (e.g. bool `false` vs i64 `0`) are counted as distinct.
|
||||
pub(crate) fn insert<T: Hash>(&mut self, value: T) {
|
||||
fn insert<T: Hash>(&mut self, value: T) {
|
||||
self.sketch.update((self.salt, value));
|
||||
}
|
||||
|
||||
fn insert_coupon(&mut self, coupon: Coupon) {
|
||||
self.sketch.update_with_coupon(coupon);
|
||||
}
|
||||
|
||||
/// Compute the final cardinality estimate.
|
||||
pub fn finalize(self) -> Option<f64> {
|
||||
Some(self.sketch.estimate().trunc())
|
||||
@@ -377,7 +520,7 @@ impl CardinalityCollector {
|
||||
let mut union = HllUnion::new(LG_K);
|
||||
union.update(&self.sketch);
|
||||
union.update(&right.sketch);
|
||||
self.sketch = union.get_result(HllType::Hll4);
|
||||
self.sketch = union.to_sketch(HllType::Hll4);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -392,7 +535,7 @@ mod tests {
|
||||
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::tests::{exec_request, get_test_index_from_terms};
|
||||
use crate::schema::{IntoIpv6Addr, Schema, FAST};
|
||||
use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
|
||||
use crate::Index;
|
||||
|
||||
#[test]
|
||||
@@ -575,6 +718,30 @@ mod tests {
|
||||
assert_eq!(estimate, 3.0);
|
||||
}
|
||||
|
||||
/// Verifies that merging two small sketches (both in List/Set coupon mode)
|
||||
/// produces an exact result — i.e. the HllUnion does not unnecessarily
|
||||
/// promote to the full HLL array when the combined cardinality is small.
|
||||
#[test]
|
||||
fn cardinality_collector_merge_stays_exact_for_small_sets() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut left = CardinalityCollector::default();
|
||||
for i in 0u64..50 {
|
||||
left.insert(i);
|
||||
}
|
||||
|
||||
let mut right = CardinalityCollector::default();
|
||||
for i in 30u64..100 {
|
||||
right.insert(i);
|
||||
}
|
||||
|
||||
left.merge_fruits(right).unwrap();
|
||||
let estimate = left.finalize().unwrap();
|
||||
// 100 distinct values (0..100). Both sketches are in Set mode (< 192 coupons),
|
||||
// so the union should stay in coupon mode and give an exact count.
|
||||
assert_eq!(estimate, 100.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_serialize_deserialize_binary() {
|
||||
use datasketches::hll::HllSketch;
|
||||
@@ -591,6 +758,98 @@ mod tests {
|
||||
assert!((deserialized.estimate() - 3.0).abs() < 0.01);
|
||||
}
|
||||
|
||||
/// Tests that the `missing` parameter correctly counts a single empty document
|
||||
/// for both u64 and str columns.
|
||||
#[test]
|
||||
fn cardinality_aggregation_missing_value_single_empty_doc() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let id_field = schema_builder.add_u64_field("id", FAST);
|
||||
let name_field = schema_builder.add_text_field("name", STRING | FAST);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer
|
||||
.add_document(doc!(id_field=>1u64,name_field=>"some_name"))
|
||||
.unwrap();
|
||||
writer.add_document(doc!()).unwrap();
|
||||
writer.commit().unwrap();
|
||||
|
||||
{
|
||||
// int colum with missing value non redundant
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "id",
|
||||
"missing": 42u64
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 2.0);
|
||||
}
|
||||
|
||||
{
|
||||
// int colum with missing value redundant
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "id",
|
||||
"missing": 1u64
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 1.0);
|
||||
}
|
||||
|
||||
{
|
||||
// str colum with missing value non redundant
|
||||
// With more than one segment, this is not well handled.
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "name",
|
||||
"missing": "other_name"
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 2.0);
|
||||
}
|
||||
|
||||
{
|
||||
// str colum with missing value redundant
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "name",
|
||||
"missing": "some_name"
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 1.0);
|
||||
}
|
||||
|
||||
{
|
||||
// str column with missing value with a number type.
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "name",
|
||||
"missing": 3,
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 2.0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_salt_differentiates_types() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
@@ -107,10 +107,9 @@ pub enum PercentileValues {
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
/// The entry when requesting percentiles with keyed: false
|
||||
pub struct PercentileValuesVecEntry {
|
||||
/// Percentile
|
||||
/// The percentile key (e.g. 1.0, 5.0, 25.0).
|
||||
pub key: f64,
|
||||
|
||||
/// Value at the percentile
|
||||
/// The percentile value. `NaN` when there are no values.
|
||||
pub value: f64,
|
||||
}
|
||||
|
||||
|
||||
@@ -133,7 +133,7 @@ mod agg_limits;
|
||||
pub mod agg_req;
|
||||
pub mod agg_result;
|
||||
pub mod bucket;
|
||||
pub(crate) mod cached_sub_aggs;
|
||||
pub(crate) mod buffered_sub_aggs;
|
||||
mod collector;
|
||||
mod date;
|
||||
mod error;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::Collector;
|
||||
use crate::collector::SegmentCollector;
|
||||
use crate::query::Weight;
|
||||
use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
|
||||
|
||||
/// `CountCollector` collector only counts how many
|
||||
@@ -55,6 +56,15 @@ impl Collector for Count {
|
||||
fn merge_fruits(&self, segment_counts: Vec<usize>) -> crate::Result<usize> {
|
||||
Ok(segment_counts.into_iter().sum())
|
||||
}
|
||||
|
||||
fn collect_segment(
|
||||
&self,
|
||||
weight: &dyn Weight,
|
||||
_segment_ord: u32,
|
||||
reader: &SegmentReader,
|
||||
) -> crate::Result<usize> {
|
||||
Ok(weight.count(reader)? as usize)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use std::cmp::{Ordering, Reverse};
|
||||
use std::collections::BinaryHeap;
|
||||
|
||||
use crate::collector::sort_key::NaturalComparator;
|
||||
use crate::collector::{SegmentSortKeyComputer, SortKeyComputer, TopNComputer};
|
||||
use crate::collector::{SegmentSortKeyComputer, SortKeyComputer};
|
||||
use crate::{DocAddress, DocId, Score};
|
||||
|
||||
/// Sort by similarity score.
|
||||
@@ -25,6 +28,10 @@ impl SortKeyComputer for SortBySimilarityScore {
|
||||
}
|
||||
|
||||
// Sorting by score is special in that it allows for the Block-Wand optimization.
|
||||
//
|
||||
// We use a BinaryHeap (TopNHeap) instead of TopNComputer here so that the
|
||||
// threshold is always the exact K-th best score. TopNComputer only updates its
|
||||
// threshold every K docs (at truncation), giving Block-WAND a stale bound.
|
||||
fn collect_segment_top_k(
|
||||
&self,
|
||||
k: usize,
|
||||
@@ -32,12 +39,10 @@ impl SortKeyComputer for SortBySimilarityScore {
|
||||
reader: &crate::SegmentReader,
|
||||
segment_ord: u32,
|
||||
) -> crate::Result<Vec<(Self::SortKey, DocAddress)>> {
|
||||
let mut top_n: TopNComputer<Score, DocId, Self::Comparator> =
|
||||
TopNComputer::new_with_comparator(k, self.comparator());
|
||||
let mut top_n = TopNHeap::new(k);
|
||||
|
||||
if let Some(alive_bitset) = reader.alive_bitset() {
|
||||
let mut threshold = Score::MIN;
|
||||
top_n.threshold = Some(threshold);
|
||||
weight.for_each_pruning(Score::MIN, reader, &mut |doc, score| {
|
||||
if alive_bitset.is_deleted(doc) {
|
||||
return threshold;
|
||||
@@ -56,7 +61,7 @@ impl SortKeyComputer for SortBySimilarityScore {
|
||||
Ok(top_n
|
||||
.into_vec()
|
||||
.into_iter()
|
||||
.map(|cid| (cid.sort_key, DocAddress::new(segment_ord, cid.doc)))
|
||||
.map(|(score, doc)| (score, DocAddress::new(segment_ord, doc)))
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
@@ -75,3 +80,204 @@ impl SegmentSortKeyComputer for SortBySimilarityScore {
|
||||
score
|
||||
}
|
||||
}
|
||||
|
||||
/// Min-heap entry: higher score = greater, lower doc wins ties.
|
||||
struct ScoreHeapEntry {
|
||||
score: Score,
|
||||
doc: DocId,
|
||||
}
|
||||
|
||||
impl Eq for ScoreHeapEntry {}
|
||||
|
||||
impl PartialEq for ScoreHeapEntry {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.cmp(other) == Ordering::Equal
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for ScoreHeapEntry {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for ScoreHeapEntry {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.score
|
||||
.partial_cmp(&other.score)
|
||||
.unwrap_or(Ordering::Equal)
|
||||
.then_with(|| other.doc.cmp(&self.doc))
|
||||
}
|
||||
}
|
||||
|
||||
/// Heap-based top-K for score collection. O(log K) per insert, but the threshold
|
||||
/// is always tight, so Block-WAND prunes better than with [`TopNComputer`]'s
|
||||
/// buffer/median approach.
|
||||
///
|
||||
/// Like [`TopNComputer`], items must arrive in ascending doc order, and equal
|
||||
/// scores are rejected (strict `>`) so that lower doc IDs win ties.
|
||||
///
|
||||
/// [`TopNComputer`]: crate::collector::TopNComputer
|
||||
struct TopNHeap {
|
||||
heap: BinaryHeap<Reverse<ScoreHeapEntry>>,
|
||||
top_n: usize,
|
||||
threshold: Option<Score>,
|
||||
}
|
||||
|
||||
impl TopNHeap {
|
||||
fn new(top_n: usize) -> Self {
|
||||
TopNHeap {
|
||||
heap: BinaryHeap::with_capacity(top_n),
|
||||
top_n,
|
||||
threshold: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn push(&mut self, score: Score, doc: DocId) {
|
||||
if self.heap.len() < self.top_n {
|
||||
self.heap.push(Reverse(ScoreHeapEntry { score, doc }));
|
||||
if self.heap.len() == self.top_n {
|
||||
self.threshold = self.heap.peek().map(|Reverse(entry)| entry.score);
|
||||
}
|
||||
} else if let Some(threshold) = self.threshold {
|
||||
if score > threshold {
|
||||
// peek_mut + assign is a single sift-down, vs pop + push = two sifts.
|
||||
if let Some(mut min) = self.heap.peek_mut() {
|
||||
*min = Reverse(ScoreHeapEntry { score, doc });
|
||||
}
|
||||
self.threshold = self.heap.peek().map(|Reverse(entry)| entry.score);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn into_vec(self) -> Vec<(Score, DocId)> {
|
||||
self.heap
|
||||
.into_vec()
|
||||
.into_iter()
|
||||
.map(|Reverse(entry)| (entry.score, entry.doc))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use proptest::prelude::*;
|
||||
|
||||
use super::*;
|
||||
use crate::collector::sort_key::NaturalComparator;
|
||||
use crate::collector::TopNComputer;
|
||||
|
||||
#[test]
|
||||
fn test_top_n_heap_zero_capacity() {
|
||||
let mut heap = TopNHeap::new(0);
|
||||
heap.push(1.0, 0);
|
||||
heap.push(2.0, 1);
|
||||
assert!(heap.into_vec().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_n_heap_basic() {
|
||||
let mut heap = TopNHeap::new(2);
|
||||
heap.push(1.0, 0);
|
||||
heap.push(3.0, 1);
|
||||
heap.push(2.0, 2);
|
||||
|
||||
let mut results = heap.into_vec();
|
||||
results.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap().then_with(|| a.1.cmp(&b.1)));
|
||||
assert_eq!(results, vec![(3.0, 1), (2.0, 2)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_n_heap_threshold_always_accurate() {
|
||||
let mut heap = TopNHeap::new(2);
|
||||
assert_eq!(heap.threshold, None);
|
||||
|
||||
heap.push(1.0, 0);
|
||||
assert_eq!(heap.threshold, None);
|
||||
|
||||
heap.push(3.0, 1);
|
||||
assert_eq!(heap.threshold, Some(1.0));
|
||||
|
||||
heap.push(2.0, 2); // evicts 1.0
|
||||
assert_eq!(heap.threshold, Some(2.0));
|
||||
|
||||
heap.push(4.0, 3); // evicts 2.0
|
||||
assert_eq!(heap.threshold, Some(3.0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_n_heap_tiebreaking_lower_doc_wins() {
|
||||
let mut heap = TopNHeap::new(2);
|
||||
heap.push(5.0, 0);
|
||||
heap.push(5.0, 1);
|
||||
heap.push(5.0, 2); // rejected: not strictly > threshold
|
||||
|
||||
let mut results = heap.into_vec();
|
||||
results.sort_by_key(|&(_, doc)| doc);
|
||||
assert_eq!(results, vec![(5.0, 0), (5.0, 1)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_n_heap_single_element() {
|
||||
let mut heap = TopNHeap::new(1);
|
||||
heap.push(1.0, 0);
|
||||
assert_eq!(heap.threshold, Some(1.0));
|
||||
|
||||
heap.push(0.5, 1); // rejected
|
||||
heap.push(2.0, 2); // accepted
|
||||
assert_eq!(heap.threshold, Some(2.0));
|
||||
|
||||
let results = heap.into_vec();
|
||||
assert_eq!(results, vec![(2.0, 2)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_n_heap_under_capacity() {
|
||||
let mut heap = TopNHeap::new(5);
|
||||
heap.push(3.0, 0);
|
||||
heap.push(1.0, 1);
|
||||
heap.push(2.0, 2);
|
||||
// Only 3 elements, capacity is 5 — all should be kept
|
||||
assert_eq!(heap.threshold, None);
|
||||
|
||||
let mut results = heap.into_vec();
|
||||
results.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap().then_with(|| a.1.cmp(&b.1)));
|
||||
assert_eq!(results, vec![(3.0, 0), (2.0, 2), (1.0, 1)]);
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn test_top_n_heap_matches_top_n_computer(
|
||||
limit in 0..20_usize,
|
||||
mut docs in proptest::collection::vec((0..1000_u32, 0..1000_u32), 0..200_usize),
|
||||
) {
|
||||
// Both require ascending doc order.
|
||||
docs.sort_by_key(|(_, doc_id)| *doc_id);
|
||||
docs.dedup_by_key(|(_, doc_id)| *doc_id);
|
||||
|
||||
let mut heap = TopNHeap::new(limit);
|
||||
let mut computer: TopNComputer<Score, DocId, NaturalComparator> =
|
||||
TopNComputer::new_with_comparator(limit, NaturalComparator);
|
||||
|
||||
for &(score_u32, doc) in &docs {
|
||||
let score = score_u32 as Score;
|
||||
heap.push(score, doc);
|
||||
computer.push(score, doc);
|
||||
}
|
||||
|
||||
let mut heap_results = heap.into_vec();
|
||||
heap_results.sort_by(|a, b| {
|
||||
b.0.partial_cmp(&a.0).unwrap().then_with(|| a.1.cmp(&b.1))
|
||||
});
|
||||
|
||||
let computer_results: Vec<(Score, DocId)> = computer
|
||||
.into_sorted_vec()
|
||||
.into_iter()
|
||||
.map(|cd| (cd.sort_key, cd.doc))
|
||||
.collect();
|
||||
|
||||
prop_assert_eq!(heap_results, computer_results);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -513,7 +513,9 @@ pub struct TopNComputer<Score, D, C> {
|
||||
/// The buffer reverses sort order to get top-semantics instead of bottom-semantics
|
||||
buffer: Vec<ComparableDoc<Score, D>>,
|
||||
top_n: usize,
|
||||
pub(crate) threshold: Option<Score>,
|
||||
/// The current threshold for pruning. Documents with scores at or below
|
||||
/// this value are skipped by `push()`. Updated when the buffer is truncated.
|
||||
pub threshold: Option<Score>,
|
||||
comparator: C,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::borrow::{Borrow, BorrowMut};
|
||||
|
||||
use common::TinySet;
|
||||
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::DocId;
|
||||
|
||||
@@ -14,6 +16,12 @@ pub const TERMINATED: DocId = i32::MAX as u32;
|
||||
/// exactly this size as long as we can fill the buffer.
|
||||
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;
|
||||
|
||||
/// Number of `TinySet` (64-bit) buckets in a block used by [`DocSet::fill_bitset_block`].
|
||||
pub const BLOCK_NUM_TINYBITSETS: usize = 16;
|
||||
|
||||
/// Number of doc IDs covered by one block: `BLOCK_NUM_TINYBITSETS * 64 = 1024`.
|
||||
pub const BLOCK_WINDOW: u32 = BLOCK_NUM_TINYBITSETS as u32 * 64;
|
||||
|
||||
/// Represents an iterable set of sorted doc ids.
|
||||
pub trait DocSet: Send {
|
||||
/// Goes to the next element.
|
||||
@@ -160,6 +168,31 @@ pub trait DocSet: Send {
|
||||
self.size_hint() as u64
|
||||
}
|
||||
|
||||
/// Fills a bitmask representing which documents in `[min_doc, min_doc + BLOCK_WINDOW)` are
|
||||
/// present in this docset.
|
||||
///
|
||||
/// The window is divided into `BLOCK_NUM_TINYBITSETS` buckets of 64 docs each.
|
||||
/// Returns the next doc `>= min_doc + BLOCK_WINDOW`, or `TERMINATED` if exhausted.
|
||||
fn fill_bitset_block(
|
||||
&mut self,
|
||||
min_doc: DocId,
|
||||
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
) -> DocId {
|
||||
self.seek(min_doc);
|
||||
let horizon = min_doc + BLOCK_WINDOW;
|
||||
loop {
|
||||
let doc = self.doc();
|
||||
if doc >= horizon {
|
||||
return doc;
|
||||
}
|
||||
let delta = doc - min_doc;
|
||||
mask[(delta / 64) as usize].insert_mut(delta % 64);
|
||||
if self.advance() == TERMINATED {
|
||||
return TERMINATED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number documents matching.
|
||||
/// Calling this method consumes the `DocSet`.
|
||||
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
|
||||
@@ -214,6 +247,18 @@ impl DocSet for &mut dyn DocSet {
|
||||
(**self).seek_danger(target)
|
||||
}
|
||||
|
||||
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
|
||||
(**self).fill_buffer(buffer)
|
||||
}
|
||||
|
||||
fn fill_bitset_block(
|
||||
&mut self,
|
||||
min_doc: DocId,
|
||||
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
) -> DocId {
|
||||
(**self).fill_bitset_block(min_doc, mask)
|
||||
}
|
||||
|
||||
fn doc(&self) -> u32 {
|
||||
(**self).doc()
|
||||
}
|
||||
@@ -256,6 +301,15 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
|
||||
unboxed.fill_buffer(buffer)
|
||||
}
|
||||
|
||||
fn fill_bitset_block(
|
||||
&mut self,
|
||||
min_doc: DocId,
|
||||
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
) -> DocId {
|
||||
let unboxed: &mut TDocSet = self.borrow_mut();
|
||||
unboxed.fill_bitset_block(min_doc, mask)
|
||||
}
|
||||
|
||||
fn doc(&self) -> DocId {
|
||||
let unboxed: &TDocSet = self.borrow();
|
||||
unboxed.doc()
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use common::TinySet;
|
||||
|
||||
use super::size_hint::estimate_intersection;
|
||||
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
|
||||
use crate::docset::{DocSet, SeekDangerResult, BLOCK_NUM_TINYBITSETS, TERMINATED};
|
||||
use crate::query::term_query::TermScorer;
|
||||
use crate::query::{EmptyScorer, Scorer};
|
||||
use crate::{DocId, Score};
|
||||
@@ -17,7 +19,7 @@ use crate::{DocId, Score};
|
||||
/// `size_hint` of the intersection.
|
||||
pub fn intersect_scorers(
|
||||
mut scorers: Vec<Box<dyn Scorer>>,
|
||||
num_docs_segment: u32,
|
||||
segment_num_docs: u32,
|
||||
) -> Box<dyn Scorer> {
|
||||
if scorers.is_empty() {
|
||||
return Box::new(EmptyScorer);
|
||||
@@ -42,14 +44,14 @@ pub fn intersect_scorers(
|
||||
left: *(left.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
|
||||
right: *(right.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
|
||||
others: scorers,
|
||||
num_docs: num_docs_segment,
|
||||
segment_num_docs,
|
||||
});
|
||||
}
|
||||
Box::new(Intersection {
|
||||
left,
|
||||
right,
|
||||
others: scorers,
|
||||
num_docs: num_docs_segment,
|
||||
segment_num_docs,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -58,7 +60,7 @@ pub struct Intersection<TDocSet: DocSet, TOtherDocSet: DocSet = Box<dyn Scorer>>
|
||||
left: TDocSet,
|
||||
right: TDocSet,
|
||||
others: Vec<TOtherDocSet>,
|
||||
num_docs: u32,
|
||||
segment_num_docs: u32,
|
||||
}
|
||||
|
||||
fn go_to_first_doc<TDocSet: DocSet>(docsets: &mut [TDocSet]) -> DocId {
|
||||
@@ -78,7 +80,10 @@ fn go_to_first_doc<TDocSet: DocSet>(docsets: &mut [TDocSet]) -> DocId {
|
||||
|
||||
impl<TDocSet: DocSet> Intersection<TDocSet, TDocSet> {
|
||||
/// num_docs is the number of documents in the segment.
|
||||
pub(crate) fn new(mut docsets: Vec<TDocSet>, num_docs: u32) -> Intersection<TDocSet, TDocSet> {
|
||||
pub(crate) fn new(
|
||||
mut docsets: Vec<TDocSet>,
|
||||
segment_num_docs: u32,
|
||||
) -> Intersection<TDocSet, TDocSet> {
|
||||
let num_docsets = docsets.len();
|
||||
assert!(num_docsets >= 2);
|
||||
docsets.sort_by_key(|docset| docset.cost());
|
||||
@@ -97,7 +102,7 @@ impl<TDocSet: DocSet> Intersection<TDocSet, TDocSet> {
|
||||
left,
|
||||
right,
|
||||
others: docsets,
|
||||
num_docs,
|
||||
segment_num_docs,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -214,7 +219,7 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
|
||||
[self.left.size_hint(), self.right.size_hint()]
|
||||
.into_iter()
|
||||
.chain(self.others.iter().map(DocSet::size_hint)),
|
||||
self.num_docs,
|
||||
self.segment_num_docs,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -224,6 +229,91 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
|
||||
// If there are docsets that are bad at skipping, they should also influence the cost.
|
||||
self.left.cost()
|
||||
}
|
||||
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
const DENSITY_THRESHOLD_INVERSE: u32 = 32;
|
||||
if self
|
||||
.left
|
||||
.size_hint()
|
||||
.saturating_mul(DENSITY_THRESHOLD_INVERSE)
|
||||
< self.segment_num_docs
|
||||
{
|
||||
// Sparse path: if the lead iterator covers less than ~3% of docs,
|
||||
// the block approach wastes time on mostly-empty blocks.
|
||||
self.count_including_deleted_sparse()
|
||||
} else {
|
||||
// Dense approach. We push documents into a block bitset to then
|
||||
// perform count using popcount.
|
||||
self.count_including_deleted_dense()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const EMPTY_BLOCK: [TinySet; BLOCK_NUM_TINYBITSETS] = [TinySet::EMPTY; BLOCK_NUM_TINYBITSETS];
|
||||
|
||||
/// ANDs `other` into `mask` in-place. Returns `true` if the result is all zeros.
|
||||
#[inline]
|
||||
fn and_blocks_and_return_is_empty(
|
||||
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
update: &[TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
) -> bool {
|
||||
let mut all_empty = true;
|
||||
for (mask_tinyset, update_tinyset) in mask.iter_mut().zip(update.iter()) {
|
||||
*mask_tinyset = mask_tinyset.intersect(*update_tinyset);
|
||||
all_empty &= mask_tinyset.is_empty();
|
||||
}
|
||||
all_empty
|
||||
}
|
||||
|
||||
impl<TDocSet: DocSet, TOtherDocSet: DocSet> Intersection<TDocSet, TOtherDocSet> {
|
||||
fn count_including_deleted_sparse(&mut self) -> u32 {
|
||||
let mut count = 0u32;
|
||||
let mut doc = self.doc();
|
||||
while doc != TERMINATED {
|
||||
count += 1;
|
||||
doc = self.advance();
|
||||
}
|
||||
count
|
||||
}
|
||||
|
||||
/// Dense block-wise bitmask intersection count.
|
||||
///
|
||||
/// Fills a 1024-doc window from each iterator, ANDs the bitmasks together,
|
||||
/// and popcounts the result. `fill_bitset_block` handles seeking tails forward
|
||||
/// when they lag behind the current block.
|
||||
fn count_including_deleted_dense(&mut self) -> u32 {
|
||||
let mut count = 0u32;
|
||||
let mut next_base = self.left.doc();
|
||||
|
||||
while next_base < TERMINATED {
|
||||
let base = next_base;
|
||||
|
||||
// Fill lead bitmask.
|
||||
let mut mask = EMPTY_BLOCK;
|
||||
next_base = next_base.max(self.left.fill_bitset_block(base, &mut mask));
|
||||
|
||||
let mut tail_mask = EMPTY_BLOCK;
|
||||
next_base = next_base.max(self.right.fill_bitset_block(base, &mut tail_mask));
|
||||
|
||||
if and_blocks_and_return_is_empty(&mut mask, &tail_mask) {
|
||||
continue;
|
||||
}
|
||||
// AND with each additional tail.
|
||||
for other in &mut self.others {
|
||||
let mut other_mask = EMPTY_BLOCK;
|
||||
next_base = next_base.max(other.fill_bitset_block(base, &mut other_mask));
|
||||
if and_blocks_and_return_is_empty(&mut mask, &other_mask) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
for tinyset in &mask {
|
||||
count += tinyset.len();
|
||||
}
|
||||
}
|
||||
|
||||
count
|
||||
}
|
||||
}
|
||||
|
||||
impl<TScorer, TOtherScorer> Scorer for Intersection<TScorer, TOtherScorer>
|
||||
@@ -421,6 +511,82 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn prop_test_count_including_deleted_matches_default(
|
||||
a in sorted_deduped_vec(1200, 400),
|
||||
b in sorted_deduped_vec(1200, 400),
|
||||
c in sorted_deduped_vec(1200, 400),
|
||||
num_docs in 1200u32..2000u32,
|
||||
) {
|
||||
// Compute expected count via set intersection.
|
||||
let expected: u32 = a.iter()
|
||||
.filter(|doc| b.contains(doc) && c.contains(doc))
|
||||
.count() as u32;
|
||||
|
||||
// Test count_including_deleted (dense path).
|
||||
let make_intersection = || {
|
||||
Intersection::new(
|
||||
vec![
|
||||
VecDocSet::from(a.clone()),
|
||||
VecDocSet::from(b.clone()),
|
||||
VecDocSet::from(c.clone()),
|
||||
],
|
||||
num_docs,
|
||||
)
|
||||
};
|
||||
|
||||
let mut intersection = make_intersection();
|
||||
let count = intersection.count_including_deleted();
|
||||
prop_assert_eq!(count, expected,
|
||||
"count_including_deleted mismatch: a={:?}, b={:?}, c={:?}", a, b, c);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_count_including_deleted_two_way() {
|
||||
let left = VecDocSet::from(vec![1, 3, 9]);
|
||||
let right = VecDocSet::from(vec![3, 4, 9, 18]);
|
||||
let mut intersection = Intersection::new(vec![left, right], 100);
|
||||
assert_eq!(intersection.count_including_deleted(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_count_including_deleted_empty() {
|
||||
let a = VecDocSet::from(vec![1, 3]);
|
||||
let b = VecDocSet::from(vec![1, 4]);
|
||||
let c = VecDocSet::from(vec![3, 9]);
|
||||
let mut intersection = Intersection::new(vec![a, b, c], 100);
|
||||
assert_eq!(intersection.count_including_deleted(), 0);
|
||||
}
|
||||
|
||||
/// Test with enough documents to exercise the dense path (>= num_docs/32).
|
||||
#[test]
|
||||
fn test_count_including_deleted_dense_path() {
|
||||
// Create dense docsets: many docs relative to segment size.
|
||||
let docs_a: Vec<u32> = (0..2000).step_by(2).collect(); // even numbers 0..2000
|
||||
let docs_b: Vec<u32> = (0..2000).step_by(3).collect(); // multiples of 3
|
||||
let expected = docs_a.iter().filter(|d| *d % 3 == 0).count() as u32;
|
||||
|
||||
let a = VecDocSet::from(docs_a);
|
||||
let b = VecDocSet::from(docs_b);
|
||||
let mut intersection = Intersection::new(vec![a, b], 2000);
|
||||
assert_eq!(intersection.count_including_deleted(), expected);
|
||||
}
|
||||
|
||||
/// Test that spans multiple blocks (>1024 docs).
|
||||
#[test]
|
||||
fn test_count_including_deleted_multi_block() {
|
||||
let docs_a: Vec<u32> = (0..5000).collect();
|
||||
let docs_b: Vec<u32> = (0..5000).step_by(7).collect();
|
||||
let expected = docs_b.len() as u32; // all of b is in a
|
||||
|
||||
let a = VecDocSet::from(docs_a);
|
||||
let b = VecDocSet::from(docs_b);
|
||||
let mut intersection = Intersection::new(vec![a, b], 5000);
|
||||
assert_eq!(intersection.count_including_deleted(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bug_2811_intersection_candidate_should_increase() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
@@ -117,6 +117,12 @@ impl DocSet for TermScorer {
|
||||
fn size_hint(&self) -> u32 {
|
||||
self.postings.size_hint()
|
||||
}
|
||||
|
||||
// TODO
|
||||
// It is probably possible to optimize fill_bitset_block for TermScorer,
|
||||
// working directly with the blocks, enabling vectorization.
|
||||
// I did not manage to get a performance improvement on Mac ARM,
|
||||
// and do not have access to x86 to investigate.
|
||||
}
|
||||
|
||||
impl Scorer for TermScorer {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-sstable"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
@@ -10,10 +10,10 @@ categories = ["database-implementations", "data-structures", "compression"]
|
||||
description = "sstables for tantivy"
|
||||
|
||||
[dependencies]
|
||||
common = {version= "0.10", path="../common", package="tantivy-common"}
|
||||
common = {version= "0.11", path="../common", package="tantivy-common"}
|
||||
futures-util = "0.3.30"
|
||||
itertools = "0.14.0"
|
||||
tantivy-bitpacker = { version= "0.9", path="../bitpacker" }
|
||||
tantivy-bitpacker = { version= "0.10", path="../bitpacker" }
|
||||
tantivy-fst = "0.5"
|
||||
# experimental gives us access to Decompressor::upper_bound
|
||||
zstd = { version = "0.13", optional = true, features = ["experimental"] }
|
||||
|
||||
@@ -512,11 +512,13 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
/// Returns the terms for a _sorted_ list of term ordinals.
|
||||
///
|
||||
/// Returns true if and only if all terms have been found.
|
||||
pub fn sorted_ords_to_term_cb<F: FnMut(&[u8]) -> io::Result<()>>(
|
||||
pub fn sorted_ords_to_term_cb(
|
||||
&self,
|
||||
mut ords: impl Iterator<Item = TermOrdinal>,
|
||||
mut cb: F,
|
||||
ords: &[TermOrdinal],
|
||||
mut cb: impl FnMut(&[u8]),
|
||||
) -> io::Result<bool> {
|
||||
assert!(ords.is_sorted());
|
||||
let mut ords = ords.iter().copied();
|
||||
let Some(mut ord) = ords.next() else {
|
||||
return Ok(true);
|
||||
};
|
||||
@@ -538,33 +540,36 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
bytes.extend_from_slice(current_sstable_delta_reader.suffix());
|
||||
current_block_ordinal += 1;
|
||||
}
|
||||
cb(&bytes)?;
|
||||
cb(&bytes);
|
||||
|
||||
// fetch the next ordinal
|
||||
let Some(next_ord) = ords.next() else {
|
||||
return Ok(true);
|
||||
let next_ord = loop {
|
||||
let Some(next_ord) = ords.next() else {
|
||||
return Ok(true);
|
||||
};
|
||||
if next_ord == ord {
|
||||
// This is the same ordinal, let's just call the callback directly.
|
||||
cb(&bytes);
|
||||
} else {
|
||||
// we checked it was sorted beforehands
|
||||
debug_assert!(next_ord > ord);
|
||||
break next_ord;
|
||||
}
|
||||
};
|
||||
|
||||
// advance forward if the new ord is different than the one we just processed
|
||||
// TODO optimization: it is silly to do a binary search to get the block every single
|
||||
// time.
|
||||
//
|
||||
// this allows the input TermOrdinal iterator to contain duplicates, so long as it's
|
||||
// still sorted
|
||||
if next_ord < ord {
|
||||
panic!("Ordinals were not sorted: received {next_ord} after {ord}");
|
||||
} else if next_ord > ord {
|
||||
// check if block changed for new term_ord
|
||||
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
|
||||
if new_block_addr != current_block_addr {
|
||||
current_block_addr = new_block_addr;
|
||||
current_block_ordinal = current_block_addr.first_ordinal;
|
||||
current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
bytes.clear();
|
||||
}
|
||||
ord = next_ord;
|
||||
} else {
|
||||
// The next ord is equal to the previous ord: no need to seek or advance.
|
||||
// Check if block changed for new term_ord
|
||||
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
|
||||
if new_block_addr != current_block_addr {
|
||||
current_block_addr = new_block_addr;
|
||||
current_block_ordinal = current_block_addr.first_ordinal;
|
||||
current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
bytes.clear();
|
||||
}
|
||||
ord = next_ord;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -671,8 +676,8 @@ mod tests {
|
||||
use common::OwnedBytes;
|
||||
|
||||
use super::Dictionary;
|
||||
use crate::MonotonicU64SSTable;
|
||||
use crate::dictionary::TermOrdHit;
|
||||
use crate::{MonotonicU64SSTable, TermOrdinal};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PermissionedHandle {
|
||||
@@ -935,25 +940,24 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ords_term() {
|
||||
fn test_sorted_ords_to_term() {
|
||||
let (dic, _slice) = make_test_sstable();
|
||||
|
||||
// Single term
|
||||
let mut terms = Vec::new();
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(100_000..100_001, |term| {
|
||||
dic.sorted_ords_to_term_cb(&[100_000], |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(terms, vec![format!("{:05X}", 100_000).into_bytes(),]);
|
||||
// Single term
|
||||
let mut terms = Vec::new();
|
||||
let ords: Vec<TermOrdinal> = (100_001..100_002).collect();
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(100_001..100_002, |term| {
|
||||
dic.sorted_ords_to_term_cb(&ords, |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
@@ -961,9 +965,8 @@ mod tests {
|
||||
// both terms
|
||||
let mut terms = Vec::new();
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(100_000..100_002, |term| {
|
||||
dic.sorted_ords_to_term_cb(&[100_000, 100_001], |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
@@ -976,10 +979,10 @@ mod tests {
|
||||
);
|
||||
// Test cross block
|
||||
let mut terms = Vec::new();
|
||||
let ords: Vec<TermOrdinal> = (98653..=98655).collect();
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(98653..=98655, |term| {
|
||||
dic.sorted_ords_to_term_cb(&ords, |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
@@ -991,6 +994,43 @@ mod tests {
|
||||
format!("{:05X}", 98655).into_bytes(),
|
||||
]
|
||||
);
|
||||
// redundant
|
||||
let mut terms = Vec::new();
|
||||
let ords: Vec<TermOrdinal> = vec![1, 1, 2];
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(&ords, |term| {
|
||||
terms.push(term.to_vec());
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
terms,
|
||||
vec![
|
||||
format!("{:05X}", 1).into_bytes(),
|
||||
format!("{:05X}", 1).into_bytes(),
|
||||
format!("{:05X}", 2).into_bytes(),
|
||||
]
|
||||
);
|
||||
// redundant cross block
|
||||
let mut terms = Vec::new();
|
||||
let ords: Vec<TermOrdinal> = vec![98653, 98653, 98654, 98654, 98655, 98655];
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(&ords, |term| {
|
||||
terms.push(term.to_vec());
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
terms,
|
||||
vec![
|
||||
format!("{:05X}", 98_653).into_bytes(),
|
||||
format!("{:05X}", 98_653).into_bytes(),
|
||||
format!("{:05X}", 98_654).into_bytes(),
|
||||
format!("{:05X}", 98_654).into_bytes(),
|
||||
format!("{:05X}", 98_655).into_bytes(),
|
||||
format!("{:05X}", 98_655).into_bytes(),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-stacker"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
@@ -9,7 +9,7 @@ description = "term hashmap used for indexing"
|
||||
|
||||
[dependencies]
|
||||
murmurhash32 = "0.3"
|
||||
common = { version = "0.10", path = "../common/", package = "tantivy-common" }
|
||||
common = { version = "0.11", path = "../common/", package = "tantivy-common" }
|
||||
ahash = { version = "0.8.11", default-features = false, optional = true }
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ rand = "0.9"
|
||||
zipf = "7.0.0"
|
||||
rustc-hash = "2.1.0"
|
||||
proptest = "1.2.0"
|
||||
binggan = { version = "0.15.3" }
|
||||
binggan = { version = "0.16.1" }
|
||||
rand_distr = "0.5"
|
||||
|
||||
[features]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-tokenizer-api"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
license = "MIT"
|
||||
edition = "2021"
|
||||
description = "Tokenizer API of tantivy"
|
||||
|
||||
Reference in New Issue
Block a user