mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-27 21:50:41 +00:00
Performance improvement for nested cardinality aggregation
When a string cardinality aggregation is nested it end up being applied to different buckets. Dictionary encoding relies on a different dictionaries for each segment. As a result, during segment collection, we only collect term ordinals in a HashSet, and decode them in the term dictionary at the end of collection. Before this PR, this decoding phase was done once for each bucket, causing the same work to be done over and over. This PR introduce a coupon cache. The HLL sketch relies on a hash of the string values. We populate the cache before bucket collection, and get our values from it. This PR also rename "caching" "buffering" in aggregation (it was never caching), and does several cleanups.
This commit is contained in:
@@ -24,6 +24,7 @@ regex = { version = "1.5.5", default-features = false, features = [
|
||||
"std",
|
||||
"unicode",
|
||||
] }
|
||||
murmurhash32 = "0.3"
|
||||
aho-corasick = "1.0"
|
||||
tantivy-fst = "0.5"
|
||||
memmap2 = { version = "0.9.0", optional = true }
|
||||
@@ -65,7 +66,7 @@ tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
|
||||
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
|
||||
datasketches = "0.2.0"
|
||||
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "eb4ad64" }
|
||||
futures-util = { version = "0.3.28", optional = true }
|
||||
futures-channel = { version = "0.3.28", optional = true }
|
||||
fnv = "1.0.7"
|
||||
|
||||
@@ -78,6 +78,7 @@ fn bench_agg(mut group: InputGroup<Index>) {
|
||||
|
||||
register!(group, cardinality_agg);
|
||||
register!(group, terms_status_with_cardinality_agg);
|
||||
register!(group, terms_100_buckets_with_cardinality_agg);
|
||||
|
||||
register!(group, range_agg);
|
||||
register!(group, range_agg_with_avg_sub_agg);
|
||||
@@ -169,6 +170,22 @@ fn terms_status_with_cardinality_agg(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"my_texts": {
|
||||
"terms": { "field": "text_few_terms_status" },
|
||||
"aggs": {
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "text_few_terms_status"
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
fn terms_100_buckets_with_cardinality_agg(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"my_texts": {
|
||||
"terms": { "field": "text_1000_terms_zipf", "size": 100 },
|
||||
"aggs": {
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
|
||||
@@ -33,14 +33,14 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
|
||||
&mut self,
|
||||
docs: &[u32],
|
||||
accessor: &Column<T>,
|
||||
missing: Option<T>,
|
||||
missing_opt: Option<T>,
|
||||
) {
|
||||
self.fetch_block(docs, accessor);
|
||||
// no missing values
|
||||
if accessor.index.get_cardinality().is_full() {
|
||||
return;
|
||||
}
|
||||
let Some(missing) = missing else {
|
||||
let Some(missing) = missing_opt else {
|
||||
return;
|
||||
};
|
||||
|
||||
@@ -191,6 +191,7 @@ where F: FnMut(u32) {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::field_reassign_with_default)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::aggregation::bucket::composite::map::{DynArrayHeapMap, MAX_DYN_ARRAY_
|
||||
use crate::aggregation::bucket::{
|
||||
CalendarInterval, CompositeAggregationSource, MissingOrder, Order,
|
||||
};
|
||||
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardSubAggCache};
|
||||
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardSubAggBuffer};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
CompositeIntermediateKey, IntermediateAggregationResult, IntermediateAggregationResults,
|
||||
IntermediateBucketResult, IntermediateCompositeBucketEntry, IntermediateCompositeBucketResult,
|
||||
@@ -119,7 +119,7 @@ pub struct SegmentCompositeCollector {
|
||||
/// One DynArrayHeapMap per parent bucket.
|
||||
parent_buckets: Vec<DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>>,
|
||||
accessor_idx: usize,
|
||||
sub_agg: Option<CachedSubAggs<HighCardSubAggCache>>,
|
||||
sub_agg: Option<BufferedSubAggs<HighCardSubAggBuffer>>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
/// Number of sources, needed when creating new DynArrayHeapMaps.
|
||||
num_sources: usize,
|
||||
@@ -218,7 +218,7 @@ impl SegmentCompositeCollector {
|
||||
let has_sub_aggregations = !node.children.is_empty();
|
||||
let sub_agg = if has_sub_aggregations {
|
||||
let sub_agg_collector = build_segment_agg_collectors(req_data, &node.children)?;
|
||||
Some(CachedSubAggs::new(sub_agg_collector))
|
||||
Some(BufferedSubAggs::new(sub_agg_collector))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -332,7 +332,7 @@ fn collect_bucket_with_limit(
|
||||
limit_num_buckets: usize,
|
||||
buckets: &mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
|
||||
key: &[InternalValueRepr],
|
||||
sub_agg: &mut Option<CachedSubAggs<HighCardSubAggCache>>,
|
||||
sub_agg: &mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
|
||||
bucket_id_provider: &mut BucketIdProvider,
|
||||
) {
|
||||
let mut record_in_bucket = |bucket: &mut CompositeBucketCollector| {
|
||||
@@ -488,7 +488,7 @@ struct CompositeKeyVisitor<'a> {
|
||||
doc_id: crate::DocId,
|
||||
composite_agg_data: &'a CompositeAggReqData,
|
||||
buckets: &'a mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
|
||||
sub_agg: &'a mut Option<CachedSubAggs<HighCardSubAggCache>>,
|
||||
sub_agg: &'a mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
|
||||
bucket_id_provider: &'a mut BucketIdProvider,
|
||||
sub_level_values: SmallVec<[InternalValueRepr; MAX_DYN_ARRAY_SIZE]>,
|
||||
}
|
||||
|
||||
@@ -511,14 +511,14 @@ mod tests {
|
||||
|
||||
fn datetime_from_iso_str(date_str: &str) -> common::DateTime {
|
||||
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
|
||||
.expect(&format!("Failed to parse date: {}", date_str));
|
||||
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
|
||||
let timestamp_secs = dt.unix_timestamp_nanos();
|
||||
common::DateTime::from_timestamp_nanos(timestamp_secs as i64)
|
||||
}
|
||||
|
||||
fn ms_timestamp_from_iso_str(date_str: &str) -> i64 {
|
||||
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
|
||||
.expect(&format!("Failed to parse date: {}", date_str));
|
||||
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
|
||||
(dt.unix_timestamp_nanos() / 1_000_000) as i64
|
||||
}
|
||||
|
||||
@@ -548,7 +548,7 @@ mod tests {
|
||||
agg_req_json["my_composite"]["composite"]["after"] = after_key.take().unwrap();
|
||||
}
|
||||
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
|
||||
let res = exec_request(agg_req.clone(), &index).unwrap();
|
||||
let res = exec_request(agg_req.clone(), index).unwrap();
|
||||
let expected_page_buckets = &expected_buckets_vec[page_idx * page_size
|
||||
..std::cmp::min((page_idx + 1) * page_size, expected_buckets_vec.len())];
|
||||
assert_eq!(
|
||||
@@ -578,7 +578,7 @@ mod tests {
|
||||
}
|
||||
});
|
||||
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
|
||||
let res = exec_request(agg_req.clone(), &index).unwrap();
|
||||
let res = exec_request(agg_req.clone(), index).unwrap();
|
||||
assert_eq!(
|
||||
res["my_composite"]["buckets"],
|
||||
json!([]),
|
||||
|
||||
@@ -6,8 +6,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use crate::aggregation::agg_data::{
|
||||
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
|
||||
};
|
||||
use crate::aggregation::cached_sub_aggs::{
|
||||
CachedSubAggs, HighCardSubAggCache, LowCardSubAggCache, SubAggCache,
|
||||
use crate::aggregation::buffered_sub_aggs::{
|
||||
BufferedSubAggs, HighCardSubAggBuffer, LowCardSubAggBuffer, SubAggBuffer,
|
||||
};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
@@ -503,17 +503,17 @@ struct DocCount {
|
||||
}
|
||||
|
||||
/// Segment collector for filter aggregation
|
||||
pub struct SegmentFilterCollector<C: SubAggCache> {
|
||||
pub struct SegmentFilterCollector<B: SubAggBuffer> {
|
||||
/// Document counts per parent bucket
|
||||
parent_buckets: Vec<DocCount>,
|
||||
/// Sub-aggregation collectors
|
||||
sub_aggregations: Option<CachedSubAggs<C>>,
|
||||
sub_aggregations: Option<BufferedSubAggs<B>>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
/// Accessor index for this filter aggregation (to access FilterAggReqData)
|
||||
accessor_idx: usize,
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> SegmentFilterCollector<C> {
|
||||
impl<B: SubAggBuffer> SegmentFilterCollector<B> {
|
||||
/// Create a new filter segment collector following the new agg_data pattern
|
||||
pub(crate) fn from_req_and_validate(
|
||||
req: &mut AggregationsSegmentCtx,
|
||||
@@ -525,7 +525,7 @@ impl<C: SubAggCache> SegmentFilterCollector<C> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let sub_agg_collector = sub_agg_collector.map(CachedSubAggs::new);
|
||||
let sub_agg_collector = sub_agg_collector.map(BufferedSubAggs::new);
|
||||
|
||||
Ok(SegmentFilterCollector {
|
||||
parent_buckets: Vec::new(),
|
||||
@@ -547,16 +547,16 @@ pub(crate) fn build_segment_filter_collector(
|
||||
|
||||
if is_top_level {
|
||||
Ok(Box::new(
|
||||
SegmentFilterCollector::<LowCardSubAggCache>::from_req_and_validate(req, node)?,
|
||||
SegmentFilterCollector::<LowCardSubAggBuffer>::from_req_and_validate(req, node)?,
|
||||
))
|
||||
} else {
|
||||
Ok(Box::new(
|
||||
SegmentFilterCollector::<HighCardSubAggCache>::from_req_and_validate(req, node)?,
|
||||
SegmentFilterCollector::<HighCardSubAggBuffer>::from_req_and_validate(req, node)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
|
||||
impl<B: SubAggBuffer> Debug for SegmentFilterCollector<B> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentFilterCollector")
|
||||
.field("buckets", &self.parent_buckets)
|
||||
@@ -566,7 +566,7 @@ impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> SegmentAggregationCollector for SegmentFilterCollector<C> {
|
||||
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentFilterCollector<B> {
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::aggregation::agg_data::{
|
||||
};
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::agg_result::BucketEntry;
|
||||
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs};
|
||||
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
IntermediateHistogramBucketEntry,
|
||||
@@ -258,7 +258,7 @@ pub(crate) struct SegmentHistogramBucketEntry {
|
||||
impl SegmentHistogramBucketEntry {
|
||||
pub(crate) fn into_intermediate_bucket_entry(
|
||||
self,
|
||||
sub_aggregation: &mut Option<HighCardCachedSubAggs>,
|
||||
sub_aggregation: &mut Option<HighCardBufferedSubAggs>,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<IntermediateHistogramBucketEntry> {
|
||||
let mut sub_aggregation_res = IntermediateAggregationResults::default();
|
||||
@@ -291,7 +291,7 @@ pub struct SegmentHistogramCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
/// One Histogram bucket per parent bucket id.
|
||||
parent_buckets: Vec<HistogramBuckets>,
|
||||
sub_agg: Option<HighCardCachedSubAggs>,
|
||||
sub_agg: Option<HighCardBufferedSubAggs>,
|
||||
accessor_idx: usize,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
}
|
||||
@@ -444,7 +444,7 @@ impl SegmentHistogramCollector {
|
||||
max: f64::MAX,
|
||||
});
|
||||
req_data.offset = req_data.req.offset.unwrap_or(0.0);
|
||||
let sub_agg = sub_agg.map(CachedSubAggs::new);
|
||||
let sub_agg = sub_agg.map(BufferedSubAggs::new);
|
||||
|
||||
Ok(Self {
|
||||
parent_buckets: Default::default(),
|
||||
|
||||
@@ -9,8 +9,9 @@ use crate::aggregation::agg_data::{
|
||||
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
|
||||
};
|
||||
use crate::aggregation::agg_limits::AggregationLimitsGuard;
|
||||
use crate::aggregation::cached_sub_aggs::{
|
||||
CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache,
|
||||
use crate::aggregation::buffered_sub_aggs::{
|
||||
BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer,
|
||||
SubAggBuffer,
|
||||
};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
@@ -155,13 +156,13 @@ pub(crate) struct SegmentRangeAndBucketEntry {
|
||||
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
pub struct SegmentRangeCollector<C: SubAggCache> {
|
||||
pub struct SegmentRangeCollector<B: SubAggBuffer> {
|
||||
/// The buckets containing the aggregation data.
|
||||
/// One for each ParentBucketId
|
||||
parent_buckets: Vec<Vec<SegmentRangeAndBucketEntry>>,
|
||||
column_type: ColumnType,
|
||||
pub(crate) accessor_idx: usize,
|
||||
sub_agg: Option<CachedSubAggs<C>>,
|
||||
sub_agg: Option<BufferedSubAggs<B>>,
|
||||
/// Here things get a bit weird. We need to assign unique bucket ids across all
|
||||
/// parent buckets. So we keep track of the next available bucket id here.
|
||||
/// This allows a kind of flattening of the bucket ids across all parent buckets.
|
||||
@@ -178,7 +179,7 @@ pub struct SegmentRangeCollector<C: SubAggCache> {
|
||||
limits: AggregationLimitsGuard,
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> Debug for SegmentRangeCollector<C> {
|
||||
impl<B: SubAggBuffer> Debug for SegmentRangeCollector<B> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentRangeCollector")
|
||||
.field("parent_buckets_len", &self.parent_buckets.len())
|
||||
@@ -229,7 +230,7 @@ impl SegmentRangeBucketEntry {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> SegmentAggregationCollector for SegmentRangeCollector<C> {
|
||||
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
@@ -350,8 +351,8 @@ pub(crate) fn build_segment_range_collector(
|
||||
};
|
||||
|
||||
if is_low_card {
|
||||
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggCache> {
|
||||
sub_agg: sub_agg.map(LowCardCachedSubAggs::new),
|
||||
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggBuffer> {
|
||||
sub_agg: sub_agg.map(LowCardBufferedSubAggs::new),
|
||||
column_type: field_type,
|
||||
accessor_idx,
|
||||
parent_buckets: Vec::new(),
|
||||
@@ -359,8 +360,8 @@ pub(crate) fn build_segment_range_collector(
|
||||
limits: agg_data.context.limits.clone(),
|
||||
}))
|
||||
} else {
|
||||
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggCache> {
|
||||
sub_agg: sub_agg.map(CachedSubAggs::new),
|
||||
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggBuffer> {
|
||||
sub_agg: sub_agg.map(BufferedSubAggs::new),
|
||||
column_type: field_type,
|
||||
accessor_idx,
|
||||
parent_buckets: Vec::new(),
|
||||
@@ -370,7 +371,7 @@ pub(crate) fn build_segment_range_collector(
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: SubAggCache> SegmentRangeCollector<C> {
|
||||
impl<B: SubAggBuffer> SegmentRangeCollector<B> {
|
||||
pub(crate) fn create_new_buckets(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
@@ -554,7 +555,7 @@ mod tests {
|
||||
pub fn get_collector_from_ranges(
|
||||
ranges: Vec<RangeAggregationRange>,
|
||||
field_type: ColumnType,
|
||||
) -> SegmentRangeCollector<HighCardSubAggCache> {
|
||||
) -> SegmentRangeCollector<HighCardSubAggBuffer> {
|
||||
let req = RangeAggregation {
|
||||
field: "dummy".to_string(),
|
||||
ranges,
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::fmt::Debug;
|
||||
use std::io;
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
use columnar::column_values::CompactSpaceU64Accessor;
|
||||
@@ -17,8 +16,9 @@ use crate::aggregation::agg_data::{
|
||||
};
|
||||
use crate::aggregation::agg_limits::MemoryConsumption;
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::cached_sub_aggs::{
|
||||
CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache,
|
||||
use crate::aggregation::buffered_sub_aggs::{
|
||||
BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer,
|
||||
SubAggBuffer,
|
||||
};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
@@ -391,7 +391,7 @@ pub(crate) fn build_segment_term_collector(
|
||||
// Decide which bucket storage is best suited for this aggregation.
|
||||
if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations {
|
||||
let term_buckets = VecTermBucketsNoAgg::new(max_term_id + 1, &mut bucket_id_provider);
|
||||
let collector: SegmentTermCollector<_, HighCardSubAggCache> = SegmentTermCollector {
|
||||
let collector: SegmentTermCollector<_, HighCardSubAggBuffer> = SegmentTermCollector {
|
||||
parent_buckets: vec![term_buckets],
|
||||
sub_agg: None,
|
||||
bucket_id_provider,
|
||||
@@ -401,8 +401,8 @@ pub(crate) fn build_segment_term_collector(
|
||||
Ok(Box::new(collector))
|
||||
} else if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC {
|
||||
let term_buckets = VecTermBuckets::new(max_term_id + 1, &mut bucket_id_provider);
|
||||
let sub_agg = sub_agg_collector.map(LowCardCachedSubAggs::new);
|
||||
let collector: SegmentTermCollector<_, LowCardSubAggCache> = SegmentTermCollector {
|
||||
let sub_agg = sub_agg_collector.map(LowCardBufferedSubAggs::new);
|
||||
let collector: SegmentTermCollector<_, LowCardSubAggBuffer> = SegmentTermCollector {
|
||||
parent_buckets: vec![term_buckets],
|
||||
sub_agg,
|
||||
bucket_id_provider,
|
||||
@@ -414,8 +414,8 @@ pub(crate) fn build_segment_term_collector(
|
||||
let term_buckets: PagedTermMap =
|
||||
PagedTermMap::new(max_term_id + 1, &mut bucket_id_provider);
|
||||
// Build sub-aggregation blueprint (flat pairs)
|
||||
let sub_agg = sub_agg_collector.map(CachedSubAggs::new);
|
||||
let collector: SegmentTermCollector<PagedTermMap, HighCardSubAggCache> =
|
||||
let sub_agg = sub_agg_collector.map(BufferedSubAggs::new);
|
||||
let collector: SegmentTermCollector<PagedTermMap, HighCardSubAggBuffer> =
|
||||
SegmentTermCollector {
|
||||
parent_buckets: vec![term_buckets],
|
||||
sub_agg,
|
||||
@@ -427,8 +427,8 @@ pub(crate) fn build_segment_term_collector(
|
||||
} else {
|
||||
let term_buckets: HashMapTermBuckets = HashMapTermBuckets::default();
|
||||
// Build sub-aggregation blueprint (flat pairs)
|
||||
let sub_agg = sub_agg_collector.map(CachedSubAggs::new);
|
||||
let collector: SegmentTermCollector<HashMapTermBuckets, HighCardSubAggCache> =
|
||||
let sub_agg = sub_agg_collector.map(BufferedSubAggs::new);
|
||||
let collector: SegmentTermCollector<HashMapTermBuckets, HighCardSubAggBuffer> =
|
||||
SegmentTermCollector {
|
||||
parent_buckets: vec![term_buckets],
|
||||
sub_agg,
|
||||
@@ -758,10 +758,10 @@ impl TermAggregationMap for VecTermBuckets {
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
#[derive(Debug)]
|
||||
struct SegmentTermCollector<TermMap: TermAggregationMap, C: SubAggCache> {
|
||||
struct SegmentTermCollector<TermMap: TermAggregationMap, B: SubAggBuffer> {
|
||||
/// The buckets containing the aggregation data.
|
||||
parent_buckets: Vec<TermMap>,
|
||||
sub_agg: Option<CachedSubAggs<C>>,
|
||||
sub_agg: Option<BufferedSubAggs<B>>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
max_term_id: u64,
|
||||
terms_req_data: TermsAggReqData,
|
||||
@@ -772,8 +772,8 @@ pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
|
||||
(agg_name, agg_property)
|
||||
}
|
||||
|
||||
impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
|
||||
for SegmentTermCollector<TermMap, C>
|
||||
impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentAggregationCollector
|
||||
for SegmentTermCollector<TermMap, B>
|
||||
{
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
@@ -790,8 +790,14 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
|
||||
let term_req = &self.terms_req_data;
|
||||
let name = term_req.name.clone();
|
||||
|
||||
let bucket =
|
||||
Self::into_intermediate_bucket_result(term_req, &mut self.sub_agg, bucket, agg_data)?;
|
||||
let bucket = Self::into_intermediate_bucket_result(
|
||||
term_req,
|
||||
self.sub_agg
|
||||
.as_mut()
|
||||
.map(BufferedSubAggs::get_sub_agg_collector),
|
||||
bucket,
|
||||
agg_data,
|
||||
)?;
|
||||
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -907,10 +913,38 @@ fn extract_missing_value<T>(
|
||||
Some((key, bucket))
|
||||
}
|
||||
|
||||
impl<TermMap, C> SegmentTermCollector<TermMap, C>
|
||||
fn reborrow_opt_collector<'a>(
|
||||
opt: &'a mut Option<&mut dyn SegmentAggregationCollector>,
|
||||
) -> Option<&'a mut dyn SegmentAggregationCollector> {
|
||||
match opt {
|
||||
Some(inner) => Some(*inner),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn into_intermediate_bucket_entry(
|
||||
bucket: Bucket,
|
||||
sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<IntermediateTermBucketEntry> {
|
||||
let mut sub_aggregation_res = IntermediateAggregationResults::default();
|
||||
if let Some(sub_agg_collector) = sub_agg_collector {
|
||||
sub_agg_collector.add_intermediate_aggregation_result(
|
||||
agg_data,
|
||||
&mut sub_aggregation_res,
|
||||
bucket.bucket_id,
|
||||
)?;
|
||||
}
|
||||
Ok(IntermediateTermBucketEntry {
|
||||
doc_count: bucket.count,
|
||||
sub_aggregation: sub_aggregation_res,
|
||||
})
|
||||
}
|
||||
|
||||
impl<TermMap, B> SegmentTermCollector<TermMap, B>
|
||||
where
|
||||
TermMap: TermAggregationMap,
|
||||
C: SubAggCache,
|
||||
B: SubAggBuffer,
|
||||
{
|
||||
fn get_memory_consumption(&self) -> usize {
|
||||
self.parent_buckets
|
||||
@@ -922,7 +956,7 @@ where
|
||||
#[inline]
|
||||
pub(crate) fn into_intermediate_bucket_result(
|
||||
term_req: &TermsAggReqData,
|
||||
sub_agg: &mut Option<CachedSubAggs<C>>,
|
||||
mut sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>,
|
||||
term_buckets: TermMap,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<IntermediateBucketResult> {
|
||||
@@ -965,31 +999,6 @@ where
|
||||
let mut dict: FxHashMap<IntermediateKey, IntermediateTermBucketEntry> = Default::default();
|
||||
dict.reserve(entries.len());
|
||||
|
||||
let into_intermediate_bucket_entry =
|
||||
|bucket: Bucket,
|
||||
sub_agg: &mut Option<CachedSubAggs<C>>|
|
||||
-> crate::Result<IntermediateTermBucketEntry> {
|
||||
if let Some(sub_agg) = sub_agg {
|
||||
let mut sub_aggregation_res = IntermediateAggregationResults::default();
|
||||
sub_agg
|
||||
.get_sub_agg_collector()
|
||||
.add_intermediate_aggregation_result(
|
||||
agg_data,
|
||||
&mut sub_aggregation_res,
|
||||
bucket.bucket_id,
|
||||
)?;
|
||||
Ok(IntermediateTermBucketEntry {
|
||||
doc_count: bucket.count,
|
||||
sub_aggregation: sub_aggregation_res,
|
||||
})
|
||||
} else {
|
||||
Ok(IntermediateTermBucketEntry {
|
||||
doc_count: bucket.count,
|
||||
sub_aggregation: Default::default(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
if term_req.column_type == ColumnType::Str {
|
||||
let fallback_dict = Dictionary::empty();
|
||||
let term_dict = term_req
|
||||
@@ -1000,7 +1009,11 @@ where
|
||||
|
||||
if let Some((intermediate_key, bucket)) = extract_missing_value(&mut entries, term_req)
|
||||
{
|
||||
let intermediate_entry = into_intermediate_bucket_entry(bucket, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
bucket,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
dict.insert(intermediate_key, intermediate_entry);
|
||||
}
|
||||
|
||||
@@ -1008,19 +1021,28 @@ where
|
||||
entries.sort_unstable_by_key(|bucket| bucket.0);
|
||||
|
||||
let (term_ids, buckets): (Vec<u64>, Vec<Bucket>) = entries.into_iter().unzip();
|
||||
let mut buckets_it = buckets.into_iter();
|
||||
|
||||
term_dict.sorted_ords_to_term_cb(term_ids.into_iter(), |term| {
|
||||
let bucket = buckets_it.next().unwrap();
|
||||
let intermediate_entry =
|
||||
into_intermediate_bucket_entry(bucket, sub_agg).map_err(io::Error::other)?;
|
||||
let intermediate_entries: Vec<IntermediateTermBucketEntry> = buckets
|
||||
.into_iter()
|
||||
.map(|bucket| {
|
||||
into_intermediate_bucket_entry(
|
||||
bucket,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)
|
||||
})
|
||||
.collect::<crate::Result<_>>()?;
|
||||
|
||||
let mut intermediate_entry_it = intermediate_entries.into_iter();
|
||||
|
||||
term_dict.sorted_ords_to_term_cb(&term_ids[..], |term| {
|
||||
let intermediate_entry = intermediate_entry_it.next().unwrap();
|
||||
dict.insert(
|
||||
IntermediateKey::Str(
|
||||
String::from_utf8(term.to_vec()).expect("could not convert to String"),
|
||||
),
|
||||
intermediate_entry,
|
||||
);
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
if term_req.req.min_doc_count == 0 {
|
||||
@@ -1055,14 +1077,22 @@ where
|
||||
}
|
||||
} else if term_req.column_type == ColumnType::DateTime {
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
doc_count,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
let val = i64::from_u64(val);
|
||||
let date = format_date(val)?;
|
||||
dict.insert(IntermediateKey::Str(date), intermediate_entry);
|
||||
}
|
||||
} else if term_req.column_type == ColumnType::Bool {
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
doc_count,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
let val = bool::from_u64(val);
|
||||
dict.insert(IntermediateKey::Bool(val), intermediate_entry);
|
||||
}
|
||||
@@ -1082,14 +1112,22 @@ where
|
||||
})?;
|
||||
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
doc_count,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
|
||||
let val = Ipv6Addr::from_u128(val);
|
||||
dict.insert(IntermediateKey::IpAddr(val), intermediate_entry);
|
||||
}
|
||||
} else {
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
|
||||
let intermediate_entry = into_intermediate_bucket_entry(
|
||||
doc_count,
|
||||
reborrow_opt_collector(&mut sub_agg_collector),
|
||||
agg_data,
|
||||
)?;
|
||||
if term_req.column_type == ColumnType::U64 {
|
||||
dict.insert(IntermediateKey::U64(val), intermediate_entry);
|
||||
} else if term_req.column_type == ColumnType::I64 {
|
||||
@@ -1123,13 +1161,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentTermCollector<TermMap, C> {
|
||||
impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentTermCollector<TermMap, B> {
|
||||
#[inline]
|
||||
fn collect_terms_with_docs(
|
||||
iter: impl Iterator<Item = (crate::DocId, u64)>,
|
||||
term_buckets: &mut TermMap,
|
||||
bucket_id_provider: &mut BucketIdProvider,
|
||||
sub_agg: &mut CachedSubAggs<C>,
|
||||
sub_agg: &mut BufferedSubAggs<B>,
|
||||
) {
|
||||
for (doc, term_id) in iter {
|
||||
let bucket_id = term_buckets.term_entry(term_id, bucket_id_provider);
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::aggregation::agg_data::{
|
||||
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
|
||||
};
|
||||
use crate::aggregation::bucket::term_agg::TermsAggregation;
|
||||
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs};
|
||||
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs};
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
IntermediateKey, IntermediateTermBucketEntry, IntermediateTermBucketResult,
|
||||
@@ -47,7 +47,7 @@ struct MissingCount {
|
||||
#[derive(Default, Debug)]
|
||||
pub struct TermMissingAgg {
|
||||
accessor_idx: usize,
|
||||
sub_agg: Option<HighCardCachedSubAggs>,
|
||||
sub_agg: Option<HighCardBufferedSubAggs>,
|
||||
/// Idx = parent bucket id, Value = missing count for that bucket
|
||||
missing_count_per_bucket: Vec<MissingCount>,
|
||||
bucket_id_provider: BucketIdProvider,
|
||||
@@ -66,7 +66,7 @@ impl TermMissingAgg {
|
||||
None
|
||||
};
|
||||
|
||||
let sub_agg = sub_agg.map(CachedSubAggs::new);
|
||||
let sub_agg = sub_agg.map(BufferedSubAggs::new);
|
||||
let bucket_id_provider = BucketIdProvider::default();
|
||||
|
||||
Ok(Self {
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::aggregation::bucket::MAX_NUM_TERMS_FOR_VEC;
|
||||
use crate::aggregation::BucketId;
|
||||
use crate::DocId;
|
||||
|
||||
/// A cache for sub-aggregations, storing doc ids per bucket id.
|
||||
/// A buffer for sub-aggregations, storing doc ids per bucket id.
|
||||
/// Depending on the cardinality of the parent aggregation, we use different
|
||||
/// storage strategies.
|
||||
///
|
||||
@@ -24,21 +24,21 @@ use crate::DocId;
|
||||
/// aggregations.
|
||||
/// What this datastructure does in general is to group docs by bucket id.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CachedSubAggs<C: SubAggCache> {
|
||||
cache: C,
|
||||
pub(crate) struct BufferedSubAggs<B: SubAggBuffer> {
|
||||
buffer: B,
|
||||
sub_agg_collector: Box<dyn SegmentAggregationCollector>,
|
||||
num_docs: usize,
|
||||
}
|
||||
|
||||
pub type LowCardCachedSubAggs = CachedSubAggs<LowCardSubAggCache>;
|
||||
pub type HighCardCachedSubAggs = CachedSubAggs<HighCardSubAggCache>;
|
||||
pub type LowCardBufferedSubAggs = BufferedSubAggs<LowCardSubAggBuffer>;
|
||||
pub type HighCardBufferedSubAggs = BufferedSubAggs<HighCardSubAggBuffer>;
|
||||
|
||||
const FLUSH_THRESHOLD: usize = 2048;
|
||||
|
||||
/// A trait for caching sub-aggregation doc ids per bucket id.
|
||||
/// A trait for buffering sub-aggregation doc ids per bucket id.
|
||||
/// Different implementations can be used depending on the cardinality
|
||||
/// of the parent aggregation.
|
||||
pub trait SubAggCache: Debug {
|
||||
pub trait SubAggBuffer: Debug {
|
||||
fn new() -> Self;
|
||||
fn push(&mut self, bucket_id: BucketId, doc_id: DocId);
|
||||
fn flush_local(
|
||||
@@ -49,22 +49,22 @@ pub trait SubAggCache: Debug {
|
||||
) -> crate::Result<()>;
|
||||
}
|
||||
|
||||
impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
|
||||
impl<Backend: SubAggBuffer + Debug> BufferedSubAggs<Backend> {
|
||||
pub fn new(sub_agg: Box<dyn SegmentAggregationCollector>) -> Self {
|
||||
Self {
|
||||
cache: Backend::new(),
|
||||
buffer: Backend::new(),
|
||||
sub_agg_collector: sub_agg,
|
||||
num_docs: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_sub_agg_collector(&mut self) -> &mut Box<dyn SegmentAggregationCollector> {
|
||||
&mut self.sub_agg_collector
|
||||
pub fn get_sub_agg_collector(&mut self) -> &mut dyn SegmentAggregationCollector {
|
||||
&mut *self.sub_agg_collector
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn push(&mut self, bucket_id: BucketId, doc_id: DocId) {
|
||||
self.cache.push(bucket_id, doc_id);
|
||||
self.buffer.push(bucket_id, doc_id);
|
||||
self.num_docs += 1;
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
if self.num_docs >= FLUSH_THRESHOLD {
|
||||
self.cache
|
||||
self.buffer
|
||||
.flush_local(&mut self.sub_agg_collector, agg_data, false)?;
|
||||
self.num_docs = 0;
|
||||
}
|
||||
@@ -85,7 +85,7 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
|
||||
/// Note: this _does_ flush the sub aggregations.
|
||||
pub fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
|
||||
if self.num_docs != 0 {
|
||||
self.cache
|
||||
self.buffer
|
||||
.flush_local(&mut self.sub_agg_collector, agg_data, true)?;
|
||||
self.num_docs = 0;
|
||||
}
|
||||
@@ -94,11 +94,11 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of partitions for high cardinality sub-aggregation cache.
|
||||
/// Number of partitions for high cardinality sub-aggregation buffer.
|
||||
const NUM_PARTITIONS: usize = 16;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct HighCardSubAggCache {
|
||||
pub(crate) struct HighCardSubAggBuffer {
|
||||
/// This weird partitioning is used to do some cheap grouping on the bucket ids.
|
||||
/// bucket ids are dense, e.g. when we don't detect the cardinality as low cardinality,
|
||||
/// but there are just 16 bucket ids, each bucket id will go to its own partition.
|
||||
@@ -108,7 +108,7 @@ pub(crate) struct HighCardSubAggCache {
|
||||
partitions: Box<[PartitionEntry; NUM_PARTITIONS]>,
|
||||
}
|
||||
|
||||
impl HighCardSubAggCache {
|
||||
impl HighCardSubAggBuffer {
|
||||
#[inline]
|
||||
fn clear(&mut self) {
|
||||
for partition in self.partitions.iter_mut() {
|
||||
@@ -131,7 +131,7 @@ impl PartitionEntry {
|
||||
}
|
||||
}
|
||||
|
||||
impl SubAggCache for HighCardSubAggCache {
|
||||
impl SubAggBuffer for HighCardSubAggBuffer {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
partitions: Box::new(core::array::from_fn(|_| PartitionEntry::default())),
|
||||
@@ -173,14 +173,14 @@ impl SubAggCache for HighCardSubAggCache {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct LowCardSubAggCache {
|
||||
/// Cache doc ids per bucket for sub-aggregations.
|
||||
pub(crate) struct LowCardSubAggBuffer {
|
||||
/// Buffer doc ids per bucket for sub-aggregations.
|
||||
///
|
||||
/// The outer Vec is indexed by BucketId.
|
||||
per_bucket_docs: Vec<Vec<DocId>>,
|
||||
}
|
||||
|
||||
impl LowCardSubAggCache {
|
||||
impl LowCardSubAggBuffer {
|
||||
#[inline]
|
||||
fn clear(&mut self) {
|
||||
for v in &mut self.per_bucket_docs {
|
||||
@@ -189,7 +189,7 @@ impl LowCardSubAggCache {
|
||||
}
|
||||
}
|
||||
|
||||
impl SubAggCache for LowCardSubAggCache {
|
||||
impl SubAggBuffer for LowCardSubAggBuffer {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
per_bucket_docs: Vec::new(),
|
||||
@@ -1,6 +1,6 @@
|
||||
use super::agg_req::Aggregations;
|
||||
use super::agg_result::AggregationResults;
|
||||
use super::cached_sub_aggs::LowCardCachedSubAggs;
|
||||
use super::buffered_sub_aggs::LowCardBufferedSubAggs;
|
||||
use super::intermediate_agg_result::IntermediateAggregationResults;
|
||||
use super::AggContextParams;
|
||||
// group buffering strategy is chosen explicitly by callers; no need to hash-group on the fly.
|
||||
@@ -136,7 +136,7 @@ fn merge_fruits(
|
||||
/// `AggregationSegmentCollector` does the aggregation collection on a segment.
|
||||
pub struct AggregationSegmentCollector {
|
||||
aggs_with_accessor: AggregationsSegmentCtx,
|
||||
agg_collector: LowCardCachedSubAggs,
|
||||
agg_collector: LowCardBufferedSubAggs,
|
||||
error: Option<TantivyError>,
|
||||
}
|
||||
|
||||
@@ -152,7 +152,7 @@ impl AggregationSegmentCollector {
|
||||
let mut agg_data =
|
||||
build_aggregations_data_from_req(agg, reader, segment_ordinal, context.clone())?;
|
||||
let mut result =
|
||||
LowCardCachedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?);
|
||||
LowCardBufferedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?);
|
||||
result
|
||||
.get_sub_agg_collector()
|
||||
.prepare_max_bucket(0, &agg_data)?; // prepare for bucket zero
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
use std::io;
|
||||
|
||||
use columnar::column_values::CompactSpaceU64Accessor;
|
||||
use columnar::{Column, ColumnType, Dictionary, StrColumn};
|
||||
use common::f64_to_u64;
|
||||
use datasketches::hll::{HllSketch, HllType, HllUnion};
|
||||
use rustc_hash::FxHashSet;
|
||||
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
use crate::aggregation::agg_data::AggregationsSegmentCtx;
|
||||
@@ -120,9 +121,69 @@ impl CardinalityAggregationReq {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
/// A coupon is the hash used to represent our elements in our cardinality sketch.
|
||||
/// TODO switch to u64, but this requires updating the lib upstream.
|
||||
type Coupon = u32;
|
||||
|
||||
/// A CouponCache is here to cache the mapping term ordinal -> coupon (see above).
|
||||
/// The idea is that we do not want to fetch terms associated to several term ordinals,
|
||||
/// several times due to the fact that we have several buckets.
|
||||
enum CouponCache {
|
||||
Dense {
|
||||
coupon_map: Vec<Coupon>,
|
||||
missing_coupon_opt: Option<u32>,
|
||||
},
|
||||
Sparse {
|
||||
coupon_map: FxHashMap<u64, Coupon>,
|
||||
missing_coupon_opt: Option<u32>,
|
||||
},
|
||||
}
|
||||
|
||||
impl CouponCache {
|
||||
fn new(
|
||||
term_ords: Vec<u64>,
|
||||
coupons: Vec<Coupon>,
|
||||
missing_coupon_opt: Option<Coupon>,
|
||||
) -> CouponCache {
|
||||
let num_terms = term_ords.len();
|
||||
assert_eq!(num_terms, coupons.len());
|
||||
if term_ords.is_empty() {
|
||||
return CouponCache::Dense {
|
||||
coupon_map: Vec::new(),
|
||||
missing_coupon_opt,
|
||||
};
|
||||
}
|
||||
let highest_term_ord = term_ords.last().copied().unwrap_or(0u64);
|
||||
// We prefer the dense implementation, if it is not too wasteful.
|
||||
// There are two cases for which we can use it.
|
||||
// 1- if the data is small.
|
||||
// 2- if the data is not necessarily small, but due to a high occupancy ratio, the RAM usage
|
||||
// is not that much bigger than if we had used a HashSet. (occupancy ratio + extra
|
||||
// metadata ~ x2.25)
|
||||
let should_use_dense =
|
||||
highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64;
|
||||
if should_use_dense {
|
||||
let mut coupon_map: Vec<Coupon> = vec![0; highest_term_ord as usize + 1];
|
||||
for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) {
|
||||
coupon_map[term_ord as usize] = coupon;
|
||||
}
|
||||
CouponCache::Dense {
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
}
|
||||
} else {
|
||||
let coupon_map: FxHashMap<u64, u32> = term_ords.into_iter().zip(coupons).collect();
|
||||
CouponCache::Sparse {
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SegmentCardinalityCollector {
|
||||
buckets: Vec<SegmentCardinalityCollectorBucket>,
|
||||
/// Buckets are Some(_) until they get consumed by into_intermediate_results().
|
||||
buckets: Vec<Option<SegmentCardinalityCollectorBucket>>,
|
||||
accessor_idx: usize,
|
||||
/// The column accessor to access the fast field values.
|
||||
accessor: Column<u64>,
|
||||
@@ -130,75 +191,133 @@ pub(crate) struct SegmentCardinalityCollector {
|
||||
column_type: ColumnType,
|
||||
/// The missing value normalized to the internal u64 representation of the field type.
|
||||
missing_value_for_accessor: Option<u64>,
|
||||
coupon_cache: Option<CouponCache>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentCardinalityCollector {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentCardinalityCollector")
|
||||
.field("column_type", &self.column_type)
|
||||
.field(
|
||||
"missing_value_for_accessor",
|
||||
&self.missing_value_for_accessor,
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Default)]
|
||||
pub(crate) struct SegmentCardinalityCollectorBucket {
|
||||
cardinality: CardinalityCollector,
|
||||
entries: FxHashSet<u64>,
|
||||
}
|
||||
impl SegmentCardinalityCollectorBucket {
|
||||
#[inline(always)]
|
||||
pub fn new(column_type: ColumnType) -> Self {
|
||||
Self {
|
||||
cardinality: CardinalityCollector::new(column_type as u8),
|
||||
entries: FxHashSet::default(),
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a intermediate metric result.
|
||||
//
|
||||
// If the column is not str, the values have been added to the
|
||||
// sketch during collection.
|
||||
//
|
||||
// If the column is str, then the values are dictionary encoded
|
||||
// and have not been added to the sketch yet.
|
||||
// We need to resolves the term ords accumulated in self.entries
|
||||
// with the coupon cache, and append the results to the sketch.
|
||||
fn into_intermediate_metric_result(
|
||||
mut self,
|
||||
req_data: &CardinalityAggReqData,
|
||||
coupon_cache_opt: Option<&CouponCache>,
|
||||
) -> crate::Result<IntermediateMetricResult> {
|
||||
if req_data.column_type == ColumnType::Str {
|
||||
let fallback_dict = Dictionary::empty();
|
||||
let dict = req_data
|
||||
.str_dict_column
|
||||
.as_ref()
|
||||
.map(|el| el.dictionary())
|
||||
.unwrap_or_else(|| &fallback_dict);
|
||||
let mut has_missing = false;
|
||||
if let Some(coupon_cache) = coupon_cache_opt {
|
||||
assert!(self.cardinality.sketch.is_empty());
|
||||
append_to_sketch(&self.entries, coupon_cache, &mut self.cardinality);
|
||||
}
|
||||
Ok(IntermediateMetricResult::Cardinality(self.cardinality))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: replace FxHashSet with something that allows iterating in order
|
||||
// (e.g. sparse bitvec)
|
||||
let mut term_ids = Vec::new();
|
||||
for term_ord in self.entries.into_iter() {
|
||||
if term_ord == u64::MAX {
|
||||
has_missing = true;
|
||||
} else {
|
||||
// we can reasonably exclude values above u32::MAX
|
||||
term_ids.push(term_ord as u32);
|
||||
}
|
||||
}
|
||||
/// Builds a coupon cache from the given buckets, dictionary, and optional missing value.
|
||||
/// Returns a mapping from term_ord to the hash (coupon) of the associated term.
|
||||
fn build_coupon_cache(
|
||||
buckets: &[Option<SegmentCardinalityCollectorBucket>],
|
||||
dictionary: &Dictionary,
|
||||
missing_value_opt: Option<&Key>,
|
||||
) -> io::Result<CouponCache> {
|
||||
let term_ords_capacity: usize = buckets
|
||||
.iter()
|
||||
.flatten()
|
||||
.map(|bucket| bucket.entries.len())
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
* 2;
|
||||
let mut term_ords_set = FxHashSet::with_capacity_and_hasher(term_ords_capacity, FxBuildHasher);
|
||||
for bucket in buckets.iter().flatten() {
|
||||
term_ords_set.extend(bucket.entries.iter().copied());
|
||||
}
|
||||
let mut term_ords: Vec<u64> = term_ords_set.into_iter().collect();
|
||||
term_ords.sort_unstable();
|
||||
|
||||
term_ids.sort_unstable();
|
||||
dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| {
|
||||
self.cardinality.insert(term);
|
||||
Ok(())
|
||||
})?;
|
||||
if has_missing {
|
||||
// Replace missing with the actual value provided
|
||||
let missing_key =
|
||||
req_data.req.missing.as_ref().expect(
|
||||
"Found sentinel value u64::MAX for term_ord but `missing` is not set",
|
||||
);
|
||||
match missing_key {
|
||||
Key::Str(missing) => {
|
||||
self.cardinality.insert(missing.as_str());
|
||||
}
|
||||
Key::F64(val) => {
|
||||
let val = f64_to_u64(*val);
|
||||
self.cardinality.insert(val);
|
||||
}
|
||||
Key::U64(val) => {
|
||||
self.cardinality.insert(*val);
|
||||
}
|
||||
Key::I64(val) => {
|
||||
self.cardinality.insert(*val);
|
||||
}
|
||||
term_ords.pop_if(|highest_term_ord| *highest_term_ord >= dictionary.num_terms() as u64);
|
||||
|
||||
let mut coupons: Vec<Coupon> = Vec::with_capacity(term_ords.len());
|
||||
let all_term_ords_found: bool =
|
||||
dictionary.sorted_ords_to_term_cb(&term_ords, |term_bytes| {
|
||||
let coupon: Coupon = murmurhash32::murmurhash2(term_bytes);
|
||||
coupons.push(coupon);
|
||||
})?;
|
||||
assert!(all_term_ords_found);
|
||||
|
||||
// Regardless of whether or not there is effectively a missing value in one of the buckets,
|
||||
// we populate the cache with the missing key too (if any).
|
||||
let missing_coupon_opt: Option<Coupon> = missing_value_opt.map(|missing_key| {
|
||||
if let Key::Str(missing_value_str) = missing_key {
|
||||
murmurhash32::murmurhash2(missing_value_str.as_bytes())
|
||||
} else {
|
||||
// See https://github.com/quickwit-oss/tantivy/issues/2891
|
||||
// A missing key with a type different from Str will not work as intended
|
||||
// for the moment.
|
||||
//
|
||||
// Right now this is just a partial workaround.
|
||||
35679954u32
|
||||
}
|
||||
});
|
||||
Ok(CouponCache::new(term_ords, coupons, missing_coupon_opt))
|
||||
}
|
||||
|
||||
fn append_to_sketch(
|
||||
term_ords: &FxHashSet<u64>,
|
||||
coupon_cache: &CouponCache,
|
||||
sketch: &mut CardinalityCollector,
|
||||
) {
|
||||
match coupon_cache {
|
||||
CouponCache::Dense {
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
} => {
|
||||
for &term_ord in term_ords {
|
||||
if let Some(coupon) = coupon_map
|
||||
.get(term_ord as usize)
|
||||
.copied()
|
||||
.or(*missing_coupon_opt)
|
||||
{
|
||||
sketch.insert_coupon(coupon);
|
||||
}
|
||||
}
|
||||
}
|
||||
CouponCache::Sparse {
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
} => {
|
||||
for term_ord in term_ords {
|
||||
if let Some(coupon) = coupon_map.get(term_ord).copied().or(*missing_coupon_opt) {
|
||||
sketch.insert_coupon(coupon);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(IntermediateMetricResult::Cardinality(self.cardinality))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,11 +329,12 @@ impl SegmentCardinalityCollector {
|
||||
missing_value_for_accessor: Option<u64>,
|
||||
) -> Self {
|
||||
Self {
|
||||
buckets: vec![SegmentCardinalityCollectorBucket::new(column_type); 1],
|
||||
buckets: Vec::new(),
|
||||
column_type,
|
||||
accessor_idx,
|
||||
accessor,
|
||||
missing_value_for_accessor,
|
||||
coupon_cache: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,15 +356,35 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
results: &mut IntermediateAggregationResults,
|
||||
parent_bucket_id: BucketId,
|
||||
bucket_id: BucketId,
|
||||
) -> crate::Result<()> {
|
||||
self.prepare_max_bucket(parent_bucket_id, agg_data)?;
|
||||
self.prepare_max_bucket(bucket_id, agg_data)?;
|
||||
let req_data = &agg_data.get_cardinality_req_data(self.accessor_idx);
|
||||
// Strings are dictionary encoded. Fetching the terms associated to strings
|
||||
// is expensive. For this reason, we do that once for all buckets and cache the results
|
||||
// here.
|
||||
if let Some(str_dict_column) = &req_data.str_dict_column {
|
||||
// Ensure the coupon cache is populated.
|
||||
// A mapping from term_ord to the hash of the associated term.
|
||||
// The missing value sentinel will be associated to the hash of the missing value if
|
||||
// any.
|
||||
if self.coupon_cache.is_none() {
|
||||
self.coupon_cache = Some(build_coupon_cache(
|
||||
&self.buckets,
|
||||
str_dict_column.dictionary(),
|
||||
req_data.req.missing.as_ref(),
|
||||
)?);
|
||||
}
|
||||
}
|
||||
let name = req_data.name.to_string();
|
||||
// take the bucket in buckets and replace it with a new empty one
|
||||
let bucket = std::mem::take(&mut self.buckets[parent_bucket_id as usize]);
|
||||
|
||||
let intermediate_result = bucket.into_intermediate_metric_result(req_data)?;
|
||||
let Some(bucket) = self.buckets[bucket_id as usize].take() else {
|
||||
return Err(crate::TantivyError::InternalError(
|
||||
"the same bucket should not be finalized twice.".to_string(),
|
||||
));
|
||||
};
|
||||
let intermediate_result =
|
||||
bucket.into_intermediate_metric_result(self.coupon_cache.as_ref())?;
|
||||
results.push(
|
||||
name,
|
||||
IntermediateAggregationResult::Metric(intermediate_result),
|
||||
@@ -260,8 +400,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
self.fetch_block_with_field(docs, agg_data);
|
||||
let bucket = &mut self.buckets[parent_bucket_id as usize];
|
||||
|
||||
let Some(bucket) = &mut self.buckets[parent_bucket_id as usize].as_mut() else {
|
||||
return Err(crate::TantivyError::InternalError(
|
||||
"collection should not happen after finalization".to_string(),
|
||||
));
|
||||
};
|
||||
let col_block_accessor = &agg_data.column_block_accessor;
|
||||
if self.column_type == ColumnType::Str {
|
||||
for term_ord in col_block_accessor.iter_vals() {
|
||||
@@ -301,7 +444,7 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
) -> crate::Result<()> {
|
||||
if max_bucket as usize >= self.buckets.len() {
|
||||
self.buckets.resize_with(max_bucket as usize + 1, || {
|
||||
SegmentCardinalityCollectorBucket::new(self.column_type)
|
||||
Some(SegmentCardinalityCollectorBucket::new(self.column_type))
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
@@ -358,10 +501,14 @@ impl CardinalityCollector {
|
||||
/// Insert a value into the HLL sketch, salted by the column type.
|
||||
/// The salt ensures that identical u64 values from different column types
|
||||
/// (e.g. bool `false` vs i64 `0`) are counted as distinct.
|
||||
pub(crate) fn insert<T: Hash>(&mut self, value: T) {
|
||||
fn insert<T: Hash>(&mut self, value: T) {
|
||||
self.sketch.update((self.salt, value));
|
||||
}
|
||||
|
||||
fn insert_coupon(&mut self, coupon: Coupon) {
|
||||
self.sketch.update_with_coupon(coupon);
|
||||
}
|
||||
|
||||
/// Compute the final cardinality estimate.
|
||||
pub fn finalize(self) -> Option<f64> {
|
||||
Some(self.sketch.estimate().trunc())
|
||||
@@ -377,7 +524,7 @@ impl CardinalityCollector {
|
||||
let mut union = HllUnion::new(LG_K);
|
||||
union.update(&self.sketch);
|
||||
union.update(&right.sketch);
|
||||
self.sketch = union.get_result(HllType::Hll4);
|
||||
self.sketch = union.to_sketch(HllType::Hll4);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -392,7 +539,7 @@ mod tests {
|
||||
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::tests::{exec_request, get_test_index_from_terms};
|
||||
use crate::schema::{IntoIpv6Addr, Schema, FAST};
|
||||
use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
|
||||
use crate::Index;
|
||||
|
||||
#[test]
|
||||
@@ -575,6 +722,30 @@ mod tests {
|
||||
assert_eq!(estimate, 3.0);
|
||||
}
|
||||
|
||||
/// Verifies that merging two small sketches (both in List/Set coupon mode)
|
||||
/// produces an exact result — i.e. the HllUnion does not unnecessarily
|
||||
/// promote to the full HLL array when the combined cardinality is small.
|
||||
#[test]
|
||||
fn cardinality_collector_merge_stays_exact_for_small_sets() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut left = CardinalityCollector::default();
|
||||
for i in 0u64..50 {
|
||||
left.insert(i);
|
||||
}
|
||||
|
||||
let mut right = CardinalityCollector::default();
|
||||
for i in 30u64..100 {
|
||||
right.insert(i);
|
||||
}
|
||||
|
||||
left.merge_fruits(right).unwrap();
|
||||
let estimate = left.finalize().unwrap();
|
||||
// 100 distinct values (0..100). Both sketches are in Set mode (< 192 coupons),
|
||||
// so the union should stay in coupon mode and give an exact count.
|
||||
assert_eq!(estimate, 100.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_serialize_deserialize_binary() {
|
||||
use datasketches::hll::HllSketch;
|
||||
@@ -591,6 +762,98 @@ mod tests {
|
||||
assert!((deserialized.estimate() - 3.0).abs() < 0.01);
|
||||
}
|
||||
|
||||
/// Tests that the `missing` parameter correctly counts a single empty document
|
||||
/// for both u64 and str columns.
|
||||
#[test]
|
||||
fn cardinality_aggregation_missing_value_single_empty_doc() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let id_field = schema_builder.add_u64_field("id", FAST);
|
||||
let name_field = schema_builder.add_text_field("name", STRING | FAST);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer
|
||||
.add_document(doc!(id_field=>1u64,name_field=>"some_name"))
|
||||
.unwrap();
|
||||
writer.add_document(doc!()).unwrap();
|
||||
writer.commit().unwrap();
|
||||
|
||||
{
|
||||
// int colum with missing value non redundant
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "id",
|
||||
"missing": 42u64
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 2.0);
|
||||
}
|
||||
|
||||
{
|
||||
// int colum with missing value redundant
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "id",
|
||||
"missing": 1u64
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 1.0);
|
||||
}
|
||||
|
||||
{
|
||||
// str colum with missing value non redundant
|
||||
// With more than one segment, this is not well handled.
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "name",
|
||||
"missing": "other_name"
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 2.0);
|
||||
}
|
||||
|
||||
{
|
||||
// str colum with missing value redundant
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "name",
|
||||
"missing": "some_name"
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 1.0);
|
||||
}
|
||||
|
||||
{
|
||||
// str column with missing value with a number type.
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "name",
|
||||
"missing": 3,
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
assert_eq!(res["cardinality"]["value"], 2.0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_salt_differentiates_types() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
@@ -133,7 +133,7 @@ mod agg_limits;
|
||||
pub mod agg_req;
|
||||
pub mod agg_result;
|
||||
pub mod bucket;
|
||||
pub(crate) mod cached_sub_aggs;
|
||||
pub(crate) mod buffered_sub_aggs;
|
||||
mod collector;
|
||||
mod date;
|
||||
mod error;
|
||||
|
||||
@@ -512,11 +512,13 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
/// Returns the terms for a _sorted_ list of term ordinals.
|
||||
///
|
||||
/// Returns true if and only if all terms have been found.
|
||||
pub fn sorted_ords_to_term_cb<F: FnMut(&[u8]) -> io::Result<()>>(
|
||||
pub fn sorted_ords_to_term_cb(
|
||||
&self,
|
||||
mut ords: impl Iterator<Item = TermOrdinal>,
|
||||
mut cb: F,
|
||||
ords: &[TermOrdinal],
|
||||
mut cb: impl FnMut(&[u8]),
|
||||
) -> io::Result<bool> {
|
||||
assert!(ords.is_sorted());
|
||||
let mut ords = ords.iter().copied();
|
||||
let Some(mut ord) = ords.next() else {
|
||||
return Ok(true);
|
||||
};
|
||||
@@ -538,33 +540,36 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
bytes.extend_from_slice(current_sstable_delta_reader.suffix());
|
||||
current_block_ordinal += 1;
|
||||
}
|
||||
cb(&bytes)?;
|
||||
cb(&bytes);
|
||||
|
||||
// fetch the next ordinal
|
||||
let Some(next_ord) = ords.next() else {
|
||||
return Ok(true);
|
||||
let next_ord = loop {
|
||||
let Some(next_ord) = ords.next() else {
|
||||
return Ok(true);
|
||||
};
|
||||
if next_ord == ord {
|
||||
// This is the same ordinal, let's just call the callback directly.
|
||||
cb(&bytes);
|
||||
} else {
|
||||
// we checked it was sorted beforehands
|
||||
debug_assert!(next_ord > ord);
|
||||
break next_ord;
|
||||
}
|
||||
};
|
||||
|
||||
// advance forward if the new ord is different than the one we just processed
|
||||
// TODO optimization: it is silly to do a binary search to get the block every single
|
||||
// time.
|
||||
//
|
||||
// this allows the input TermOrdinal iterator to contain duplicates, so long as it's
|
||||
// still sorted
|
||||
if next_ord < ord {
|
||||
panic!("Ordinals were not sorted: received {next_ord} after {ord}");
|
||||
} else if next_ord > ord {
|
||||
// check if block changed for new term_ord
|
||||
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
|
||||
if new_block_addr != current_block_addr {
|
||||
current_block_addr = new_block_addr;
|
||||
current_block_ordinal = current_block_addr.first_ordinal;
|
||||
current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
bytes.clear();
|
||||
}
|
||||
ord = next_ord;
|
||||
} else {
|
||||
// The next ord is equal to the previous ord: no need to seek or advance.
|
||||
// Check if block changed for new term_ord
|
||||
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
|
||||
if new_block_addr != current_block_addr {
|
||||
current_block_addr = new_block_addr;
|
||||
current_block_ordinal = current_block_addr.first_ordinal;
|
||||
current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
bytes.clear();
|
||||
}
|
||||
ord = next_ord;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -671,8 +676,8 @@ mod tests {
|
||||
use common::OwnedBytes;
|
||||
|
||||
use super::Dictionary;
|
||||
use crate::MonotonicU64SSTable;
|
||||
use crate::dictionary::TermOrdHit;
|
||||
use crate::{MonotonicU64SSTable, TermOrdinal};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PermissionedHandle {
|
||||
@@ -935,25 +940,24 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ords_term() {
|
||||
fn test_sorted_ords_to_term() {
|
||||
let (dic, _slice) = make_test_sstable();
|
||||
|
||||
// Single term
|
||||
let mut terms = Vec::new();
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(100_000..100_001, |term| {
|
||||
dic.sorted_ords_to_term_cb(&[100_000], |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(terms, vec![format!("{:05X}", 100_000).into_bytes(),]);
|
||||
// Single term
|
||||
let mut terms = Vec::new();
|
||||
let ords: Vec<TermOrdinal> = (100_001..100_002).collect();
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(100_001..100_002, |term| {
|
||||
dic.sorted_ords_to_term_cb(&ords, |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
@@ -961,9 +965,8 @@ mod tests {
|
||||
// both terms
|
||||
let mut terms = Vec::new();
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(100_000..100_002, |term| {
|
||||
dic.sorted_ords_to_term_cb(&[100_000, 100_001], |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
@@ -976,10 +979,10 @@ mod tests {
|
||||
);
|
||||
// Test cross block
|
||||
let mut terms = Vec::new();
|
||||
let ords: Vec<TermOrdinal> = (98653..=98655).collect();
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(98653..=98655, |term| {
|
||||
dic.sorted_ords_to_term_cb(&ords, |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
@@ -991,6 +994,43 @@ mod tests {
|
||||
format!("{:05X}", 98655).into_bytes(),
|
||||
]
|
||||
);
|
||||
// redundant
|
||||
let mut terms = Vec::new();
|
||||
let ords: Vec<TermOrdinal> = vec![1, 1, 2];
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(&ords, |term| {
|
||||
terms.push(term.to_vec());
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
terms,
|
||||
vec![
|
||||
format!("{:05X}", 1).into_bytes(),
|
||||
format!("{:05X}", 1).into_bytes(),
|
||||
format!("{:05X}", 2).into_bytes(),
|
||||
]
|
||||
);
|
||||
// redundant cross block
|
||||
let mut terms = Vec::new();
|
||||
let ords: Vec<TermOrdinal> = vec![98653, 98653, 98654, 98654, 98655, 98655];
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(&ords, |term| {
|
||||
terms.push(term.to_vec());
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
terms,
|
||||
vec![
|
||||
format!("{:05X}", 98_653).into_bytes(),
|
||||
format!("{:05X}", 98_653).into_bytes(),
|
||||
format!("{:05X}", 98_654).into_bytes(),
|
||||
format!("{:05X}", 98_654).into_bytes(),
|
||||
format!("{:05X}", 98_655).into_bytes(),
|
||||
format!("{:05X}", 98_655).into_bytes(),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user