diff --git a/Cargo.toml b/Cargo.toml index 4cc50c163..7d2ed19f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ regex = { version = "1.5.5", default-features = false, features = [ "std", "unicode", ] } +murmurhash32 = "0.3" aho-corasick = "1.0" tantivy-fst = "0.5" memmap2 = { version = "0.9.0", optional = true } @@ -65,7 +66,7 @@ tantivy-bitpacker = { version = "0.10", path = "./bitpacker" } common = { version = "0.11", path = "./common/", package = "tantivy-common" } tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" } sketches-ddsketch = { version = "0.4", features = ["use_serde"] } -datasketches = "0.2.0" +datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "eb4ad64" } futures-util = { version = "0.3.28", optional = true } futures-channel = { version = "0.3.28", optional = true } fnv = "1.0.7" diff --git a/benches/agg_bench.rs b/benches/agg_bench.rs index 68cacdf37..89a41732d 100644 --- a/benches/agg_bench.rs +++ b/benches/agg_bench.rs @@ -78,6 +78,7 @@ fn bench_agg(mut group: InputGroup) { register!(group, cardinality_agg); register!(group, terms_status_with_cardinality_agg); + register!(group, terms_100_buckets_with_cardinality_agg); register!(group, range_agg); register!(group, range_agg_with_avg_sub_agg); @@ -169,6 +170,22 @@ fn terms_status_with_cardinality_agg(index: &Index) { let agg_req = json!({ "my_texts": { "terms": { "field": "text_few_terms_status" }, + "aggs": { + "cardinality": { + "cardinality": { + "field": "text_few_terms_status" + }, + } + } + }, + }); + execute_agg(index, agg_req); +} + +fn terms_100_buckets_with_cardinality_agg(index: &Index) { + let agg_req = json!({ + "my_texts": { + "terms": { "field": "text_1000_terms_zipf", "size": 100 }, "aggs": { "cardinality": { "cardinality": { diff --git a/columnar/src/block_accessor.rs b/columnar/src/block_accessor.rs index 227cf804d..2793f3a1b 100644 --- a/columnar/src/block_accessor.rs +++ b/columnar/src/block_accessor.rs @@ -33,14 +33,14 @@ impl &mut self, docs: &[u32], accessor: &Column, - missing: Option, + missing_opt: Option, ) { self.fetch_block(docs, accessor); // no missing values if accessor.index.get_cardinality().is_full() { return; } - let Some(missing) = missing else { + let Some(missing) = missing_opt else { return; }; @@ -191,6 +191,7 @@ where F: FnMut(u32) { } #[cfg(test)] +#[allow(clippy::field_reassign_with_default)] mod tests { use super::*; diff --git a/src/aggregation/bucket/composite/collector.rs b/src/aggregation/bucket/composite/collector.rs index 22b8ee0bc..9ce343bf2 100644 --- a/src/aggregation/bucket/composite/collector.rs +++ b/src/aggregation/bucket/composite/collector.rs @@ -21,7 +21,7 @@ use crate::aggregation::bucket::composite::map::{DynArrayHeapMap, MAX_DYN_ARRAY_ use crate::aggregation::bucket::{ CalendarInterval, CompositeAggregationSource, MissingOrder, Order, }; -use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardSubAggCache}; +use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardSubAggBuffer}; use crate::aggregation::intermediate_agg_result::{ CompositeIntermediateKey, IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateCompositeBucketEntry, IntermediateCompositeBucketResult, @@ -119,7 +119,7 @@ pub struct SegmentCompositeCollector { /// One DynArrayHeapMap per parent bucket. parent_buckets: Vec>, accessor_idx: usize, - sub_agg: Option>, + sub_agg: Option>, bucket_id_provider: BucketIdProvider, /// Number of sources, needed when creating new DynArrayHeapMaps. num_sources: usize, @@ -218,7 +218,7 @@ impl SegmentCompositeCollector { let has_sub_aggregations = !node.children.is_empty(); let sub_agg = if has_sub_aggregations { let sub_agg_collector = build_segment_agg_collectors(req_data, &node.children)?; - Some(CachedSubAggs::new(sub_agg_collector)) + Some(BufferedSubAggs::new(sub_agg_collector)) } else { None }; @@ -332,7 +332,7 @@ fn collect_bucket_with_limit( limit_num_buckets: usize, buckets: &mut DynArrayHeapMap, key: &[InternalValueRepr], - sub_agg: &mut Option>, + sub_agg: &mut Option>, bucket_id_provider: &mut BucketIdProvider, ) { let mut record_in_bucket = |bucket: &mut CompositeBucketCollector| { @@ -488,7 +488,7 @@ struct CompositeKeyVisitor<'a> { doc_id: crate::DocId, composite_agg_data: &'a CompositeAggReqData, buckets: &'a mut DynArrayHeapMap, - sub_agg: &'a mut Option>, + sub_agg: &'a mut Option>, bucket_id_provider: &'a mut BucketIdProvider, sub_level_values: SmallVec<[InternalValueRepr; MAX_DYN_ARRAY_SIZE]>, } diff --git a/src/aggregation/bucket/composite/mod.rs b/src/aggregation/bucket/composite/mod.rs index d3b9a8f11..fe0a2a26e 100644 --- a/src/aggregation/bucket/composite/mod.rs +++ b/src/aggregation/bucket/composite/mod.rs @@ -511,14 +511,14 @@ mod tests { fn datetime_from_iso_str(date_str: &str) -> common::DateTime { let dt = OffsetDateTime::parse(date_str, &Rfc3339) - .expect(&format!("Failed to parse date: {}", date_str)); + .unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str)); let timestamp_secs = dt.unix_timestamp_nanos(); common::DateTime::from_timestamp_nanos(timestamp_secs as i64) } fn ms_timestamp_from_iso_str(date_str: &str) -> i64 { let dt = OffsetDateTime::parse(date_str, &Rfc3339) - .expect(&format!("Failed to parse date: {}", date_str)); + .unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str)); (dt.unix_timestamp_nanos() / 1_000_000) as i64 } @@ -548,7 +548,7 @@ mod tests { agg_req_json["my_composite"]["composite"]["after"] = after_key.take().unwrap(); } let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap(); - let res = exec_request(agg_req.clone(), &index).unwrap(); + let res = exec_request(agg_req.clone(), index).unwrap(); let expected_page_buckets = &expected_buckets_vec[page_idx * page_size ..std::cmp::min((page_idx + 1) * page_size, expected_buckets_vec.len())]; assert_eq!( @@ -578,7 +578,7 @@ mod tests { } }); let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap(); - let res = exec_request(agg_req.clone(), &index).unwrap(); + let res = exec_request(agg_req.clone(), index).unwrap(); assert_eq!( res["my_composite"]["buckets"], json!([]), diff --git a/src/aggregation/bucket/filter.rs b/src/aggregation/bucket/filter.rs index 73518238a..5e432b038 100644 --- a/src/aggregation/bucket/filter.rs +++ b/src/aggregation/bucket/filter.rs @@ -6,8 +6,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::aggregation::agg_data::{ build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx, }; -use crate::aggregation::cached_sub_aggs::{ - CachedSubAggs, HighCardSubAggCache, LowCardSubAggCache, SubAggCache, +use crate::aggregation::buffered_sub_aggs::{ + BufferedSubAggs, HighCardSubAggBuffer, LowCardSubAggBuffer, SubAggBuffer, }; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, @@ -503,17 +503,17 @@ struct DocCount { } /// Segment collector for filter aggregation -pub struct SegmentFilterCollector { +pub struct SegmentFilterCollector { /// Document counts per parent bucket parent_buckets: Vec, /// Sub-aggregation collectors - sub_aggregations: Option>, + sub_aggregations: Option>, bucket_id_provider: BucketIdProvider, /// Accessor index for this filter aggregation (to access FilterAggReqData) accessor_idx: usize, } -impl SegmentFilterCollector { +impl SegmentFilterCollector { /// Create a new filter segment collector following the new agg_data pattern pub(crate) fn from_req_and_validate( req: &mut AggregationsSegmentCtx, @@ -525,7 +525,7 @@ impl SegmentFilterCollector { } else { None }; - let sub_agg_collector = sub_agg_collector.map(CachedSubAggs::new); + let sub_agg_collector = sub_agg_collector.map(BufferedSubAggs::new); Ok(SegmentFilterCollector { parent_buckets: Vec::new(), @@ -547,16 +547,16 @@ pub(crate) fn build_segment_filter_collector( if is_top_level { Ok(Box::new( - SegmentFilterCollector::::from_req_and_validate(req, node)?, + SegmentFilterCollector::::from_req_and_validate(req, node)?, )) } else { Ok(Box::new( - SegmentFilterCollector::::from_req_and_validate(req, node)?, + SegmentFilterCollector::::from_req_and_validate(req, node)?, )) } } -impl Debug for SegmentFilterCollector { +impl Debug for SegmentFilterCollector { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SegmentFilterCollector") .field("buckets", &self.parent_buckets) @@ -566,7 +566,7 @@ impl Debug for SegmentFilterCollector { } } -impl SegmentAggregationCollector for SegmentFilterCollector { +impl SegmentAggregationCollector for SegmentFilterCollector { fn add_intermediate_aggregation_result( &mut self, agg_data: &AggregationsSegmentCtx, diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index adf7936c6..1473727f6 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -10,7 +10,7 @@ use crate::aggregation::agg_data::{ }; use crate::aggregation::agg_req::Aggregations; use crate::aggregation::agg_result::BucketEntry; -use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs}; +use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs}; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, @@ -258,7 +258,7 @@ pub(crate) struct SegmentHistogramBucketEntry { impl SegmentHistogramBucketEntry { pub(crate) fn into_intermediate_bucket_entry( self, - sub_aggregation: &mut Option, + sub_aggregation: &mut Option, agg_data: &AggregationsSegmentCtx, ) -> crate::Result { let mut sub_aggregation_res = IntermediateAggregationResults::default(); @@ -291,7 +291,7 @@ pub struct SegmentHistogramCollector { /// The buckets containing the aggregation data. /// One Histogram bucket per parent bucket id. parent_buckets: Vec, - sub_agg: Option, + sub_agg: Option, accessor_idx: usize, bucket_id_provider: BucketIdProvider, } @@ -444,7 +444,7 @@ impl SegmentHistogramCollector { max: f64::MAX, }); req_data.offset = req_data.req.offset.unwrap_or(0.0); - let sub_agg = sub_agg.map(CachedSubAggs::new); + let sub_agg = sub_agg.map(BufferedSubAggs::new); Ok(Self { parent_buckets: Default::default(), diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 46e0065ce..5fdb9119c 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -9,8 +9,9 @@ use crate::aggregation::agg_data::{ build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx, }; use crate::aggregation::agg_limits::AggregationLimitsGuard; -use crate::aggregation::cached_sub_aggs::{ - CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache, +use crate::aggregation::buffered_sub_aggs::{ + BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer, + SubAggBuffer, }; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, @@ -155,13 +156,13 @@ pub(crate) struct SegmentRangeAndBucketEntry { /// The collector puts values from the fast field into the correct buckets and does a conversion to /// the correct datatype. -pub struct SegmentRangeCollector { +pub struct SegmentRangeCollector { /// The buckets containing the aggregation data. /// One for each ParentBucketId parent_buckets: Vec>, column_type: ColumnType, pub(crate) accessor_idx: usize, - sub_agg: Option>, + sub_agg: Option>, /// Here things get a bit weird. We need to assign unique bucket ids across all /// parent buckets. So we keep track of the next available bucket id here. /// This allows a kind of flattening of the bucket ids across all parent buckets. @@ -178,7 +179,7 @@ pub struct SegmentRangeCollector { limits: AggregationLimitsGuard, } -impl Debug for SegmentRangeCollector { +impl Debug for SegmentRangeCollector { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SegmentRangeCollector") .field("parent_buckets_len", &self.parent_buckets.len()) @@ -229,7 +230,7 @@ impl SegmentRangeBucketEntry { } } -impl SegmentAggregationCollector for SegmentRangeCollector { +impl SegmentAggregationCollector for SegmentRangeCollector { fn add_intermediate_aggregation_result( &mut self, agg_data: &AggregationsSegmentCtx, @@ -350,8 +351,8 @@ pub(crate) fn build_segment_range_collector( }; if is_low_card { - Ok(Box::new(SegmentRangeCollector:: { - sub_agg: sub_agg.map(LowCardCachedSubAggs::new), + Ok(Box::new(SegmentRangeCollector:: { + sub_agg: sub_agg.map(LowCardBufferedSubAggs::new), column_type: field_type, accessor_idx, parent_buckets: Vec::new(), @@ -359,8 +360,8 @@ pub(crate) fn build_segment_range_collector( limits: agg_data.context.limits.clone(), })) } else { - Ok(Box::new(SegmentRangeCollector:: { - sub_agg: sub_agg.map(CachedSubAggs::new), + Ok(Box::new(SegmentRangeCollector:: { + sub_agg: sub_agg.map(BufferedSubAggs::new), column_type: field_type, accessor_idx, parent_buckets: Vec::new(), @@ -370,7 +371,7 @@ pub(crate) fn build_segment_range_collector( } } -impl SegmentRangeCollector { +impl SegmentRangeCollector { pub(crate) fn create_new_buckets( &mut self, agg_data: &AggregationsSegmentCtx, @@ -554,7 +555,7 @@ mod tests { pub fn get_collector_from_ranges( ranges: Vec, field_type: ColumnType, - ) -> SegmentRangeCollector { + ) -> SegmentRangeCollector { let req = RangeAggregation { field: "dummy".to_string(), ranges, diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index b254b79ee..9c2bc0153 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -1,5 +1,4 @@ use std::fmt::Debug; -use std::io; use std::net::Ipv6Addr; use columnar::column_values::CompactSpaceU64Accessor; @@ -17,8 +16,9 @@ use crate::aggregation::agg_data::{ }; use crate::aggregation::agg_limits::MemoryConsumption; use crate::aggregation::agg_req::Aggregations; -use crate::aggregation::cached_sub_aggs::{ - CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache, +use crate::aggregation::buffered_sub_aggs::{ + BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer, + SubAggBuffer, }; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, @@ -391,7 +391,7 @@ pub(crate) fn build_segment_term_collector( // Decide which bucket storage is best suited for this aggregation. if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations { let term_buckets = VecTermBucketsNoAgg::new(max_term_id + 1, &mut bucket_id_provider); - let collector: SegmentTermCollector<_, HighCardSubAggCache> = SegmentTermCollector { + let collector: SegmentTermCollector<_, HighCardSubAggBuffer> = SegmentTermCollector { parent_buckets: vec![term_buckets], sub_agg: None, bucket_id_provider, @@ -401,8 +401,8 @@ pub(crate) fn build_segment_term_collector( Ok(Box::new(collector)) } else if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC { let term_buckets = VecTermBuckets::new(max_term_id + 1, &mut bucket_id_provider); - let sub_agg = sub_agg_collector.map(LowCardCachedSubAggs::new); - let collector: SegmentTermCollector<_, LowCardSubAggCache> = SegmentTermCollector { + let sub_agg = sub_agg_collector.map(LowCardBufferedSubAggs::new); + let collector: SegmentTermCollector<_, LowCardSubAggBuffer> = SegmentTermCollector { parent_buckets: vec![term_buckets], sub_agg, bucket_id_provider, @@ -414,8 +414,8 @@ pub(crate) fn build_segment_term_collector( let term_buckets: PagedTermMap = PagedTermMap::new(max_term_id + 1, &mut bucket_id_provider); // Build sub-aggregation blueprint (flat pairs) - let sub_agg = sub_agg_collector.map(CachedSubAggs::new); - let collector: SegmentTermCollector = + let sub_agg = sub_agg_collector.map(BufferedSubAggs::new); + let collector: SegmentTermCollector = SegmentTermCollector { parent_buckets: vec![term_buckets], sub_agg, @@ -427,8 +427,8 @@ pub(crate) fn build_segment_term_collector( } else { let term_buckets: HashMapTermBuckets = HashMapTermBuckets::default(); // Build sub-aggregation blueprint (flat pairs) - let sub_agg = sub_agg_collector.map(CachedSubAggs::new); - let collector: SegmentTermCollector = + let sub_agg = sub_agg_collector.map(BufferedSubAggs::new); + let collector: SegmentTermCollector = SegmentTermCollector { parent_buckets: vec![term_buckets], sub_agg, @@ -758,10 +758,10 @@ impl TermAggregationMap for VecTermBuckets { /// The collector puts values from the fast field into the correct buckets and does a conversion to /// the correct datatype. #[derive(Debug)] -struct SegmentTermCollector { +struct SegmentTermCollector { /// The buckets containing the aggregation data. parent_buckets: Vec, - sub_agg: Option>, + sub_agg: Option>, bucket_id_provider: BucketIdProvider, max_term_id: u64, terms_req_data: TermsAggReqData, @@ -772,8 +772,8 @@ pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) { (agg_name, agg_property) } -impl SegmentAggregationCollector - for SegmentTermCollector +impl SegmentAggregationCollector + for SegmentTermCollector { fn add_intermediate_aggregation_result( &mut self, @@ -790,8 +790,14 @@ impl SegmentAggregationCollector let term_req = &self.terms_req_data; let name = term_req.name.clone(); - let bucket = - Self::into_intermediate_bucket_result(term_req, &mut self.sub_agg, bucket, agg_data)?; + let bucket = Self::into_intermediate_bucket_result( + term_req, + self.sub_agg + .as_mut() + .map(BufferedSubAggs::get_sub_agg_collector), + bucket, + agg_data, + )?; results.push(name, IntermediateAggregationResult::Bucket(bucket))?; Ok(()) } @@ -907,10 +913,38 @@ fn extract_missing_value( Some((key, bucket)) } -impl SegmentTermCollector +fn reborrow_opt_collector<'a>( + opt: &'a mut Option<&mut dyn SegmentAggregationCollector>, +) -> Option<&'a mut dyn SegmentAggregationCollector> { + match opt { + Some(inner) => Some(*inner), + None => None, + } +} + +fn into_intermediate_bucket_entry( + bucket: Bucket, + sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>, + agg_data: &AggregationsSegmentCtx, +) -> crate::Result { + let mut sub_aggregation_res = IntermediateAggregationResults::default(); + if let Some(sub_agg_collector) = sub_agg_collector { + sub_agg_collector.add_intermediate_aggregation_result( + agg_data, + &mut sub_aggregation_res, + bucket.bucket_id, + )?; + } + Ok(IntermediateTermBucketEntry { + doc_count: bucket.count, + sub_aggregation: sub_aggregation_res, + }) +} + +impl SegmentTermCollector where TermMap: TermAggregationMap, - C: SubAggCache, + B: SubAggBuffer, { fn get_memory_consumption(&self) -> usize { self.parent_buckets @@ -922,7 +956,7 @@ where #[inline] pub(crate) fn into_intermediate_bucket_result( term_req: &TermsAggReqData, - sub_agg: &mut Option>, + mut sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>, term_buckets: TermMap, agg_data: &AggregationsSegmentCtx, ) -> crate::Result { @@ -965,31 +999,6 @@ where let mut dict: FxHashMap = Default::default(); dict.reserve(entries.len()); - let into_intermediate_bucket_entry = - |bucket: Bucket, - sub_agg: &mut Option>| - -> crate::Result { - if let Some(sub_agg) = sub_agg { - let mut sub_aggregation_res = IntermediateAggregationResults::default(); - sub_agg - .get_sub_agg_collector() - .add_intermediate_aggregation_result( - agg_data, - &mut sub_aggregation_res, - bucket.bucket_id, - )?; - Ok(IntermediateTermBucketEntry { - doc_count: bucket.count, - sub_aggregation: sub_aggregation_res, - }) - } else { - Ok(IntermediateTermBucketEntry { - doc_count: bucket.count, - sub_aggregation: Default::default(), - }) - } - }; - if term_req.column_type == ColumnType::Str { let fallback_dict = Dictionary::empty(); let term_dict = term_req @@ -1000,7 +1009,11 @@ where if let Some((intermediate_key, bucket)) = extract_missing_value(&mut entries, term_req) { - let intermediate_entry = into_intermediate_bucket_entry(bucket, sub_agg)?; + let intermediate_entry = into_intermediate_bucket_entry( + bucket, + reborrow_opt_collector(&mut sub_agg_collector), + agg_data, + )?; dict.insert(intermediate_key, intermediate_entry); } @@ -1008,19 +1021,28 @@ where entries.sort_unstable_by_key(|bucket| bucket.0); let (term_ids, buckets): (Vec, Vec) = entries.into_iter().unzip(); - let mut buckets_it = buckets.into_iter(); - term_dict.sorted_ords_to_term_cb(term_ids.into_iter(), |term| { - let bucket = buckets_it.next().unwrap(); - let intermediate_entry = - into_intermediate_bucket_entry(bucket, sub_agg).map_err(io::Error::other)?; + let intermediate_entries: Vec = buckets + .into_iter() + .map(|bucket| { + into_intermediate_bucket_entry( + bucket, + reborrow_opt_collector(&mut sub_agg_collector), + agg_data, + ) + }) + .collect::>()?; + + let mut intermediate_entry_it = intermediate_entries.into_iter(); + + term_dict.sorted_ords_to_term_cb(&term_ids[..], |term| { + let intermediate_entry = intermediate_entry_it.next().unwrap(); dict.insert( IntermediateKey::Str( String::from_utf8(term.to_vec()).expect("could not convert to String"), ), intermediate_entry, ); - Ok(()) })?; if term_req.req.min_doc_count == 0 { @@ -1055,14 +1077,22 @@ where } } else if term_req.column_type == ColumnType::DateTime { for (val, doc_count) in entries { - let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?; + let intermediate_entry = into_intermediate_bucket_entry( + doc_count, + reborrow_opt_collector(&mut sub_agg_collector), + agg_data, + )?; let val = i64::from_u64(val); let date = format_date(val)?; dict.insert(IntermediateKey::Str(date), intermediate_entry); } } else if term_req.column_type == ColumnType::Bool { for (val, doc_count) in entries { - let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?; + let intermediate_entry = into_intermediate_bucket_entry( + doc_count, + reborrow_opt_collector(&mut sub_agg_collector), + agg_data, + )?; let val = bool::from_u64(val); dict.insert(IntermediateKey::Bool(val), intermediate_entry); } @@ -1082,14 +1112,22 @@ where })?; for (val, doc_count) in entries { - let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?; + let intermediate_entry = into_intermediate_bucket_entry( + doc_count, + reborrow_opt_collector(&mut sub_agg_collector), + agg_data, + )?; let val: u128 = compact_space_accessor.compact_to_u128(val as u32); let val = Ipv6Addr::from_u128(val); dict.insert(IntermediateKey::IpAddr(val), intermediate_entry); } } else { for (val, doc_count) in entries { - let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?; + let intermediate_entry = into_intermediate_bucket_entry( + doc_count, + reborrow_opt_collector(&mut sub_agg_collector), + agg_data, + )?; if term_req.column_type == ColumnType::U64 { dict.insert(IntermediateKey::U64(val), intermediate_entry); } else if term_req.column_type == ColumnType::I64 { @@ -1123,13 +1161,13 @@ where } } -impl SegmentTermCollector { +impl SegmentTermCollector { #[inline] fn collect_terms_with_docs( iter: impl Iterator, term_buckets: &mut TermMap, bucket_id_provider: &mut BucketIdProvider, - sub_agg: &mut CachedSubAggs, + sub_agg: &mut BufferedSubAggs, ) { for (doc, term_id) in iter { let bucket_id = term_buckets.term_entry(term_id, bucket_id_provider); diff --git a/src/aggregation/bucket/term_missing_agg.rs b/src/aggregation/bucket/term_missing_agg.rs index 47c3989c6..fb2174490 100644 --- a/src/aggregation/bucket/term_missing_agg.rs +++ b/src/aggregation/bucket/term_missing_agg.rs @@ -5,7 +5,7 @@ use crate::aggregation::agg_data::{ build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx, }; use crate::aggregation::bucket::term_agg::TermsAggregation; -use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs}; +use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs}; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateKey, IntermediateTermBucketEntry, IntermediateTermBucketResult, @@ -47,7 +47,7 @@ struct MissingCount { #[derive(Default, Debug)] pub struct TermMissingAgg { accessor_idx: usize, - sub_agg: Option, + sub_agg: Option, /// Idx = parent bucket id, Value = missing count for that bucket missing_count_per_bucket: Vec, bucket_id_provider: BucketIdProvider, @@ -66,7 +66,7 @@ impl TermMissingAgg { None }; - let sub_agg = sub_agg.map(CachedSubAggs::new); + let sub_agg = sub_agg.map(BufferedSubAggs::new); let bucket_id_provider = BucketIdProvider::default(); Ok(Self { diff --git a/src/aggregation/cached_sub_aggs.rs b/src/aggregation/buffered_sub_aggs.rs similarity index 86% rename from src/aggregation/cached_sub_aggs.rs rename to src/aggregation/buffered_sub_aggs.rs index f97da31ab..87ce47bb5 100644 --- a/src/aggregation/cached_sub_aggs.rs +++ b/src/aggregation/buffered_sub_aggs.rs @@ -6,7 +6,7 @@ use crate::aggregation::bucket::MAX_NUM_TERMS_FOR_VEC; use crate::aggregation::BucketId; use crate::DocId; -/// A cache for sub-aggregations, storing doc ids per bucket id. +/// A buffer for sub-aggregations, storing doc ids per bucket id. /// Depending on the cardinality of the parent aggregation, we use different /// storage strategies. /// @@ -24,21 +24,21 @@ use crate::DocId; /// aggregations. /// What this datastructure does in general is to group docs by bucket id. #[derive(Debug)] -pub(crate) struct CachedSubAggs { - cache: C, +pub(crate) struct BufferedSubAggs { + buffer: B, sub_agg_collector: Box, num_docs: usize, } -pub type LowCardCachedSubAggs = CachedSubAggs; -pub type HighCardCachedSubAggs = CachedSubAggs; +pub type LowCardBufferedSubAggs = BufferedSubAggs; +pub type HighCardBufferedSubAggs = BufferedSubAggs; const FLUSH_THRESHOLD: usize = 2048; -/// A trait for caching sub-aggregation doc ids per bucket id. +/// A trait for buffering sub-aggregation doc ids per bucket id. /// Different implementations can be used depending on the cardinality /// of the parent aggregation. -pub trait SubAggCache: Debug { +pub trait SubAggBuffer: Debug { fn new() -> Self; fn push(&mut self, bucket_id: BucketId, doc_id: DocId); fn flush_local( @@ -49,22 +49,22 @@ pub trait SubAggCache: Debug { ) -> crate::Result<()>; } -impl CachedSubAggs { +impl BufferedSubAggs { pub fn new(sub_agg: Box) -> Self { Self { - cache: Backend::new(), + buffer: Backend::new(), sub_agg_collector: sub_agg, num_docs: 0, } } - pub fn get_sub_agg_collector(&mut self) -> &mut Box { - &mut self.sub_agg_collector + pub fn get_sub_agg_collector(&mut self) -> &mut dyn SegmentAggregationCollector { + &mut *self.sub_agg_collector } #[inline] pub fn push(&mut self, bucket_id: BucketId, doc_id: DocId) { - self.cache.push(bucket_id, doc_id); + self.buffer.push(bucket_id, doc_id); self.num_docs += 1; } @@ -75,7 +75,7 @@ impl CachedSubAggs { agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { if self.num_docs >= FLUSH_THRESHOLD { - self.cache + self.buffer .flush_local(&mut self.sub_agg_collector, agg_data, false)?; self.num_docs = 0; } @@ -85,7 +85,7 @@ impl CachedSubAggs { /// Note: this _does_ flush the sub aggregations. pub fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> { if self.num_docs != 0 { - self.cache + self.buffer .flush_local(&mut self.sub_agg_collector, agg_data, true)?; self.num_docs = 0; } @@ -94,11 +94,11 @@ impl CachedSubAggs { } } -/// Number of partitions for high cardinality sub-aggregation cache. +/// Number of partitions for high cardinality sub-aggregation buffer. const NUM_PARTITIONS: usize = 16; #[derive(Debug)] -pub(crate) struct HighCardSubAggCache { +pub(crate) struct HighCardSubAggBuffer { /// This weird partitioning is used to do some cheap grouping on the bucket ids. /// bucket ids are dense, e.g. when we don't detect the cardinality as low cardinality, /// but there are just 16 bucket ids, each bucket id will go to its own partition. @@ -108,7 +108,7 @@ pub(crate) struct HighCardSubAggCache { partitions: Box<[PartitionEntry; NUM_PARTITIONS]>, } -impl HighCardSubAggCache { +impl HighCardSubAggBuffer { #[inline] fn clear(&mut self) { for partition in self.partitions.iter_mut() { @@ -131,7 +131,7 @@ impl PartitionEntry { } } -impl SubAggCache for HighCardSubAggCache { +impl SubAggBuffer for HighCardSubAggBuffer { fn new() -> Self { Self { partitions: Box::new(core::array::from_fn(|_| PartitionEntry::default())), @@ -173,14 +173,14 @@ impl SubAggCache for HighCardSubAggCache { } #[derive(Debug)] -pub(crate) struct LowCardSubAggCache { - /// Cache doc ids per bucket for sub-aggregations. +pub(crate) struct LowCardSubAggBuffer { + /// Buffer doc ids per bucket for sub-aggregations. /// /// The outer Vec is indexed by BucketId. per_bucket_docs: Vec>, } -impl LowCardSubAggCache { +impl LowCardSubAggBuffer { #[inline] fn clear(&mut self) { for v in &mut self.per_bucket_docs { @@ -189,7 +189,7 @@ impl LowCardSubAggCache { } } -impl SubAggCache for LowCardSubAggCache { +impl SubAggBuffer for LowCardSubAggBuffer { fn new() -> Self { Self { per_bucket_docs: Vec::new(), diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index 59e9c677d..c1a825786 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -1,6 +1,6 @@ use super::agg_req::Aggregations; use super::agg_result::AggregationResults; -use super::cached_sub_aggs::LowCardCachedSubAggs; +use super::buffered_sub_aggs::LowCardBufferedSubAggs; use super::intermediate_agg_result::IntermediateAggregationResults; use super::AggContextParams; // group buffering strategy is chosen explicitly by callers; no need to hash-group on the fly. @@ -136,7 +136,7 @@ fn merge_fruits( /// `AggregationSegmentCollector` does the aggregation collection on a segment. pub struct AggregationSegmentCollector { aggs_with_accessor: AggregationsSegmentCtx, - agg_collector: LowCardCachedSubAggs, + agg_collector: LowCardBufferedSubAggs, error: Option, } @@ -152,7 +152,7 @@ impl AggregationSegmentCollector { let mut agg_data = build_aggregations_data_from_req(agg, reader, segment_ordinal, context.clone())?; let mut result = - LowCardCachedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?); + LowCardBufferedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?); result .get_sub_agg_collector() .prepare_max_bucket(0, &agg_data)?; // prepare for bucket zero diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index d1cb234da..32aa746f8 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -1,10 +1,11 @@ +use std::fmt::Debug; use std::hash::Hash; +use std::io; use columnar::column_values::CompactSpaceU64Accessor; use columnar::{Column, ColumnType, Dictionary, StrColumn}; -use common::f64_to_u64; use datasketches::hll::{HllSketch, HllType, HllUnion}; -use rustc_hash::FxHashSet; +use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::aggregation::agg_data::AggregationsSegmentCtx; @@ -120,9 +121,69 @@ impl CardinalityAggregationReq { } } -#[derive(Clone, Debug)] +/// A coupon is the hash used to represent our elements in our cardinality sketch. +/// TODO switch to u64, but this requires updating the lib upstream. +type Coupon = u32; + +/// A CouponCache is here to cache the mapping term ordinal -> coupon (see above). +/// The idea is that we do not want to fetch terms associated to several term ordinals, +/// several times due to the fact that we have several buckets. +enum CouponCache { + Dense { + coupon_map: Vec, + missing_coupon_opt: Option, + }, + Sparse { + coupon_map: FxHashMap, + missing_coupon_opt: Option, + }, +} + +impl CouponCache { + fn new( + term_ords: Vec, + coupons: Vec, + missing_coupon_opt: Option, + ) -> CouponCache { + let num_terms = term_ords.len(); + assert_eq!(num_terms, coupons.len()); + if term_ords.is_empty() { + return CouponCache::Dense { + coupon_map: Vec::new(), + missing_coupon_opt, + }; + } + let highest_term_ord = term_ords.last().copied().unwrap_or(0u64); + // We prefer the dense implementation, if it is not too wasteful. + // There are two cases for which we can use it. + // 1- if the data is small. + // 2- if the data is not necessarily small, but due to a high occupancy ratio, the RAM usage + // is not that much bigger than if we had used a HashSet. (occupancy ratio + extra + // metadata ~ x2.25) + let should_use_dense = + highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64; + if should_use_dense { + let mut coupon_map: Vec = vec![0; highest_term_ord as usize + 1]; + for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) { + coupon_map[term_ord as usize] = coupon; + } + CouponCache::Dense { + coupon_map, + missing_coupon_opt, + } + } else { + let coupon_map: FxHashMap = term_ords.into_iter().zip(coupons).collect(); + CouponCache::Sparse { + coupon_map, + missing_coupon_opt, + } + } + } +} + pub(crate) struct SegmentCardinalityCollector { - buckets: Vec, + /// Buckets are Some(_) until they get consumed by into_intermediate_results(). + buckets: Vec>, accessor_idx: usize, /// The column accessor to access the fast field values. accessor: Column, @@ -130,75 +191,133 @@ pub(crate) struct SegmentCardinalityCollector { column_type: ColumnType, /// The missing value normalized to the internal u64 representation of the field type. missing_value_for_accessor: Option, + coupon_cache: Option, +} + +impl Debug for SegmentCardinalityCollector { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("SegmentCardinalityCollector") + .field("column_type", &self.column_type) + .field( + "missing_value_for_accessor", + &self.missing_value_for_accessor, + ) + .finish() + } } -#[derive(Clone, Debug, PartialEq, Default)] pub(crate) struct SegmentCardinalityCollectorBucket { cardinality: CardinalityCollector, entries: FxHashSet, } impl SegmentCardinalityCollectorBucket { + #[inline(always)] pub fn new(column_type: ColumnType) -> Self { Self { cardinality: CardinalityCollector::new(column_type as u8), entries: FxHashSet::default(), } } + + // Returns a intermediate metric result. + // + // If the column is not str, the values have been added to the + // sketch during collection. + // + // If the column is str, then the values are dictionary encoded + // and have not been added to the sketch yet. + // We need to resolves the term ords accumulated in self.entries + // with the coupon cache, and append the results to the sketch. fn into_intermediate_metric_result( mut self, - req_data: &CardinalityAggReqData, + coupon_cache_opt: Option<&CouponCache>, ) -> crate::Result { - if req_data.column_type == ColumnType::Str { - let fallback_dict = Dictionary::empty(); - let dict = req_data - .str_dict_column - .as_ref() - .map(|el| el.dictionary()) - .unwrap_or_else(|| &fallback_dict); - let mut has_missing = false; + if let Some(coupon_cache) = coupon_cache_opt { + assert!(self.cardinality.sketch.is_empty()); + append_to_sketch(&self.entries, coupon_cache, &mut self.cardinality); + } + Ok(IntermediateMetricResult::Cardinality(self.cardinality)) + } +} - // TODO: replace FxHashSet with something that allows iterating in order - // (e.g. sparse bitvec) - let mut term_ids = Vec::new(); - for term_ord in self.entries.into_iter() { - if term_ord == u64::MAX { - has_missing = true; - } else { - // we can reasonably exclude values above u32::MAX - term_ids.push(term_ord as u32); - } - } +/// Builds a coupon cache from the given buckets, dictionary, and optional missing value. +/// Returns a mapping from term_ord to the hash (coupon) of the associated term. +fn build_coupon_cache( + buckets: &[Option], + dictionary: &Dictionary, + missing_value_opt: Option<&Key>, +) -> io::Result { + let term_ords_capacity: usize = buckets + .iter() + .flatten() + .map(|bucket| bucket.entries.len()) + .max() + .unwrap_or(0) + * 2; + let mut term_ords_set = FxHashSet::with_capacity_and_hasher(term_ords_capacity, FxBuildHasher); + for bucket in buckets.iter().flatten() { + term_ords_set.extend(bucket.entries.iter().copied()); + } + let mut term_ords: Vec = term_ords_set.into_iter().collect(); + term_ords.sort_unstable(); - term_ids.sort_unstable(); - dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| { - self.cardinality.insert(term); - Ok(()) - })?; - if has_missing { - // Replace missing with the actual value provided - let missing_key = - req_data.req.missing.as_ref().expect( - "Found sentinel value u64::MAX for term_ord but `missing` is not set", - ); - match missing_key { - Key::Str(missing) => { - self.cardinality.insert(missing.as_str()); - } - Key::F64(val) => { - let val = f64_to_u64(*val); - self.cardinality.insert(val); - } - Key::U64(val) => { - self.cardinality.insert(*val); - } - Key::I64(val) => { - self.cardinality.insert(*val); - } + term_ords.pop_if(|highest_term_ord| *highest_term_ord >= dictionary.num_terms() as u64); + + let mut coupons: Vec = Vec::with_capacity(term_ords.len()); + let all_term_ords_found: bool = + dictionary.sorted_ords_to_term_cb(&term_ords, |term_bytes| { + let coupon: Coupon = murmurhash32::murmurhash2(term_bytes); + coupons.push(coupon); + })?; + assert!(all_term_ords_found); + + // Regardless of whether or not there is effectively a missing value in one of the buckets, + // we populate the cache with the missing key too (if any). + let missing_coupon_opt: Option = missing_value_opt.map(|missing_key| { + if let Key::Str(missing_value_str) = missing_key { + murmurhash32::murmurhash2(missing_value_str.as_bytes()) + } else { + // See https://github.com/quickwit-oss/tantivy/issues/2891 + // A missing key with a type different from Str will not work as intended + // for the moment. + // + // Right now this is just a partial workaround. + 35679954u32 + } + }); + Ok(CouponCache::new(term_ords, coupons, missing_coupon_opt)) +} + +fn append_to_sketch( + term_ords: &FxHashSet, + coupon_cache: &CouponCache, + sketch: &mut CardinalityCollector, +) { + match coupon_cache { + CouponCache::Dense { + coupon_map, + missing_coupon_opt, + } => { + for &term_ord in term_ords { + if let Some(coupon) = coupon_map + .get(term_ord as usize) + .copied() + .or(*missing_coupon_opt) + { + sketch.insert_coupon(coupon); + } + } + } + CouponCache::Sparse { + coupon_map, + missing_coupon_opt, + } => { + for term_ord in term_ords { + if let Some(coupon) = coupon_map.get(term_ord).copied().or(*missing_coupon_opt) { + sketch.insert_coupon(coupon); } } } - - Ok(IntermediateMetricResult::Cardinality(self.cardinality)) } } @@ -210,11 +329,12 @@ impl SegmentCardinalityCollector { missing_value_for_accessor: Option, ) -> Self { Self { - buckets: vec![SegmentCardinalityCollectorBucket::new(column_type); 1], + buckets: Vec::new(), column_type, accessor_idx, accessor, missing_value_for_accessor, + coupon_cache: None, } } @@ -236,15 +356,35 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector { &mut self, agg_data: &AggregationsSegmentCtx, results: &mut IntermediateAggregationResults, - parent_bucket_id: BucketId, + bucket_id: BucketId, ) -> crate::Result<()> { - self.prepare_max_bucket(parent_bucket_id, agg_data)?; + self.prepare_max_bucket(bucket_id, agg_data)?; let req_data = &agg_data.get_cardinality_req_data(self.accessor_idx); + // Strings are dictionary encoded. Fetching the terms associated to strings + // is expensive. For this reason, we do that once for all buckets and cache the results + // here. + if let Some(str_dict_column) = &req_data.str_dict_column { + // Ensure the coupon cache is populated. + // A mapping from term_ord to the hash of the associated term. + // The missing value sentinel will be associated to the hash of the missing value if + // any. + if self.coupon_cache.is_none() { + self.coupon_cache = Some(build_coupon_cache( + &self.buckets, + str_dict_column.dictionary(), + req_data.req.missing.as_ref(), + )?); + } + } let name = req_data.name.to_string(); // take the bucket in buckets and replace it with a new empty one - let bucket = std::mem::take(&mut self.buckets[parent_bucket_id as usize]); - - let intermediate_result = bucket.into_intermediate_metric_result(req_data)?; + let Some(bucket) = self.buckets[bucket_id as usize].take() else { + return Err(crate::TantivyError::InternalError( + "the same bucket should not be finalized twice.".to_string(), + )); + }; + let intermediate_result = + bucket.into_intermediate_metric_result(self.coupon_cache.as_ref())?; results.push( name, IntermediateAggregationResult::Metric(intermediate_result), @@ -260,8 +400,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector { agg_data: &mut AggregationsSegmentCtx, ) -> crate::Result<()> { self.fetch_block_with_field(docs, agg_data); - let bucket = &mut self.buckets[parent_bucket_id as usize]; - + let Some(bucket) = &mut self.buckets[parent_bucket_id as usize].as_mut() else { + return Err(crate::TantivyError::InternalError( + "collection should not happen after finalization".to_string(), + )); + }; let col_block_accessor = &agg_data.column_block_accessor; if self.column_type == ColumnType::Str { for term_ord in col_block_accessor.iter_vals() { @@ -301,7 +444,7 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector { ) -> crate::Result<()> { if max_bucket as usize >= self.buckets.len() { self.buckets.resize_with(max_bucket as usize + 1, || { - SegmentCardinalityCollectorBucket::new(self.column_type) + Some(SegmentCardinalityCollectorBucket::new(self.column_type)) }); } Ok(()) @@ -358,10 +501,14 @@ impl CardinalityCollector { /// Insert a value into the HLL sketch, salted by the column type. /// The salt ensures that identical u64 values from different column types /// (e.g. bool `false` vs i64 `0`) are counted as distinct. - pub(crate) fn insert(&mut self, value: T) { + fn insert(&mut self, value: T) { self.sketch.update((self.salt, value)); } + fn insert_coupon(&mut self, coupon: Coupon) { + self.sketch.update_with_coupon(coupon); + } + /// Compute the final cardinality estimate. pub fn finalize(self) -> Option { Some(self.sketch.estimate().trunc()) @@ -377,7 +524,7 @@ impl CardinalityCollector { let mut union = HllUnion::new(LG_K); union.update(&self.sketch); union.update(&right.sketch); - self.sketch = union.get_result(HllType::Hll4); + self.sketch = union.to_sketch(HllType::Hll4); Ok(()) } } @@ -392,7 +539,7 @@ mod tests { use crate::aggregation::agg_req::Aggregations; use crate::aggregation::tests::{exec_request, get_test_index_from_terms}; - use crate::schema::{IntoIpv6Addr, Schema, FAST}; + use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING}; use crate::Index; #[test] @@ -575,6 +722,30 @@ mod tests { assert_eq!(estimate, 3.0); } + /// Verifies that merging two small sketches (both in List/Set coupon mode) + /// produces an exact result — i.e. the HllUnion does not unnecessarily + /// promote to the full HLL array when the combined cardinality is small. + #[test] + fn cardinality_collector_merge_stays_exact_for_small_sets() { + use super::CardinalityCollector; + + let mut left = CardinalityCollector::default(); + for i in 0u64..50 { + left.insert(i); + } + + let mut right = CardinalityCollector::default(); + for i in 30u64..100 { + right.insert(i); + } + + left.merge_fruits(right).unwrap(); + let estimate = left.finalize().unwrap(); + // 100 distinct values (0..100). Both sketches are in Set mode (< 192 coupons), + // so the union should stay in coupon mode and give an exact count. + assert_eq!(estimate, 100.0); + } + #[test] fn cardinality_collector_serialize_deserialize_binary() { use datasketches::hll::HllSketch; @@ -591,6 +762,98 @@ mod tests { assert!((deserialized.estimate() - 3.0).abs() < 0.01); } + /// Tests that the `missing` parameter correctly counts a single empty document + /// for both u64 and str columns. + #[test] + fn cardinality_aggregation_missing_value_single_empty_doc() { + let mut schema_builder = Schema::builder(); + let id_field = schema_builder.add_u64_field("id", FAST); + let name_field = schema_builder.add_text_field("name", STRING | FAST); + let index = Index::create_in_ram(schema_builder.build()); + let mut writer = index.writer_for_tests().unwrap(); + writer + .add_document(doc!(id_field=>1u64,name_field=>"some_name")) + .unwrap(); + writer.add_document(doc!()).unwrap(); + writer.commit().unwrap(); + + { + // int colum with missing value non redundant + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "id", + "missing": 42u64 + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 2.0); + } + + { + // int colum with missing value redundant + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "id", + "missing": 1u64 + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 1.0); + } + + { + // str colum with missing value non redundant + // With more than one segment, this is not well handled. + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "name", + "missing": "other_name" + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 2.0); + } + + { + // str colum with missing value redundant + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "name", + "missing": "some_name" + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 1.0); + } + + { + // str column with missing value with a number type. + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "name", + "missing": 3, + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 2.0); + } + } + #[test] fn cardinality_collector_salt_differentiates_types() { use super::CardinalityCollector; diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index b4a080d6a..66d439e91 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -133,7 +133,7 @@ mod agg_limits; pub mod agg_req; pub mod agg_result; pub mod bucket; -pub(crate) mod cached_sub_aggs; +pub(crate) mod buffered_sub_aggs; mod collector; mod date; mod error; diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 3a99fc16c..6a76a1e94 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -512,11 +512,13 @@ impl Dictionary { /// Returns the terms for a _sorted_ list of term ordinals. /// /// Returns true if and only if all terms have been found. - pub fn sorted_ords_to_term_cb io::Result<()>>( + pub fn sorted_ords_to_term_cb( &self, - mut ords: impl Iterator, - mut cb: F, + ords: &[TermOrdinal], + mut cb: impl FnMut(&[u8]), ) -> io::Result { + assert!(ords.is_sorted()); + let mut ords = ords.iter().copied(); let Some(mut ord) = ords.next() else { return Ok(true); }; @@ -538,33 +540,36 @@ impl Dictionary { bytes.extend_from_slice(current_sstable_delta_reader.suffix()); current_block_ordinal += 1; } - cb(&bytes)?; + cb(&bytes); // fetch the next ordinal - let Some(next_ord) = ords.next() else { - return Ok(true); + let next_ord = loop { + let Some(next_ord) = ords.next() else { + return Ok(true); + }; + if next_ord == ord { + // This is the same ordinal, let's just call the callback directly. + cb(&bytes); + } else { + // we checked it was sorted beforehands + debug_assert!(next_ord > ord); + break next_ord; + } }; - // advance forward if the new ord is different than the one we just processed + // TODO optimization: it is silly to do a binary search to get the block every single + // time. // - // this allows the input TermOrdinal iterator to contain duplicates, so long as it's - // still sorted - if next_ord < ord { - panic!("Ordinals were not sorted: received {next_ord} after {ord}"); - } else if next_ord > ord { - // check if block changed for new term_ord - let new_block_addr = self.sstable_index.get_block_with_ord(next_ord); - if new_block_addr != current_block_addr { - current_block_addr = new_block_addr; - current_block_ordinal = current_block_addr.first_ordinal; - current_sstable_delta_reader = - self.sstable_delta_reader_block(current_block_addr.clone())?; - bytes.clear(); - } - ord = next_ord; - } else { - // The next ord is equal to the previous ord: no need to seek or advance. + // Check if block changed for new term_ord + let new_block_addr = self.sstable_index.get_block_with_ord(next_ord); + if new_block_addr != current_block_addr { + current_block_addr = new_block_addr; + current_block_ordinal = current_block_addr.first_ordinal; + current_sstable_delta_reader = + self.sstable_delta_reader_block(current_block_addr.clone())?; + bytes.clear(); } + ord = next_ord; } } @@ -671,8 +676,8 @@ mod tests { use common::OwnedBytes; use super::Dictionary; - use crate::MonotonicU64SSTable; use crate::dictionary::TermOrdHit; + use crate::{MonotonicU64SSTable, TermOrdinal}; #[derive(Debug)] struct PermissionedHandle { @@ -935,25 +940,24 @@ mod tests { } #[test] - fn test_ords_term() { + fn test_sorted_ords_to_term() { let (dic, _slice) = make_test_sstable(); // Single term let mut terms = Vec::new(); assert!( - dic.sorted_ords_to_term_cb(100_000..100_001, |term| { + dic.sorted_ords_to_term_cb(&[100_000], |term| { terms.push(term.to_vec()); - Ok(()) }) .unwrap() ); assert_eq!(terms, vec![format!("{:05X}", 100_000).into_bytes(),]); // Single term let mut terms = Vec::new(); + let ords: Vec = (100_001..100_002).collect(); assert!( - dic.sorted_ords_to_term_cb(100_001..100_002, |term| { + dic.sorted_ords_to_term_cb(&ords, |term| { terms.push(term.to_vec()); - Ok(()) }) .unwrap() ); @@ -961,9 +965,8 @@ mod tests { // both terms let mut terms = Vec::new(); assert!( - dic.sorted_ords_to_term_cb(100_000..100_002, |term| { + dic.sorted_ords_to_term_cb(&[100_000, 100_001], |term| { terms.push(term.to_vec()); - Ok(()) }) .unwrap() ); @@ -976,10 +979,10 @@ mod tests { ); // Test cross block let mut terms = Vec::new(); + let ords: Vec = (98653..=98655).collect(); assert!( - dic.sorted_ords_to_term_cb(98653..=98655, |term| { + dic.sorted_ords_to_term_cb(&ords, |term| { terms.push(term.to_vec()); - Ok(()) }) .unwrap() ); @@ -991,6 +994,43 @@ mod tests { format!("{:05X}", 98655).into_bytes(), ] ); + // redundant + let mut terms = Vec::new(); + let ords: Vec = vec![1, 1, 2]; + assert!( + dic.sorted_ords_to_term_cb(&ords, |term| { + terms.push(term.to_vec()); + }) + .unwrap() + ); + assert_eq!( + terms, + vec![ + format!("{:05X}", 1).into_bytes(), + format!("{:05X}", 1).into_bytes(), + format!("{:05X}", 2).into_bytes(), + ] + ); + // redundant cross block + let mut terms = Vec::new(); + let ords: Vec = vec![98653, 98653, 98654, 98654, 98655, 98655]; + assert!( + dic.sorted_ords_to_term_cb(&ords, |term| { + terms.push(term.to_vec()); + }) + .unwrap() + ); + assert_eq!( + terms, + vec![ + format!("{:05X}", 98_653).into_bytes(), + format!("{:05X}", 98_653).into_bytes(), + format!("{:05X}", 98_654).into_bytes(), + format!("{:05X}", 98_654).into_bytes(), + format!("{:05X}", 98_655).into_bytes(), + format!("{:05X}", 98_655).into_bytes(), + ] + ); } #[test]