diff --git a/Cargo.toml b/Cargo.toml index ead328b92..27fcb1b59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.9", path = "./bitpacker" } common = { version = "0.10", path = "./common/", package = "tantivy-common" } tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" } sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] } -hyperloglogplus = { version = "0.4.1", features = ["const-loop"] } +datasketches = "0.2.0" futures-util = { version = "0.3.28", optional = true } futures-channel = { version = "0.3.28", optional = true } fnv = "1.0.7" diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index c184848d8..d1cb234da 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -1,12 +1,11 @@ -use std::collections::hash_map::DefaultHasher; -use std::hash::{BuildHasher, Hasher}; +use std::hash::Hash; use columnar::column_values::CompactSpaceU64Accessor; use columnar::{Column, ColumnType, Dictionary, StrColumn}; use common::f64_to_u64; -use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; +use datasketches::hll::{HllSketch, HllType, HllUnion}; use rustc_hash::FxHashSet; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::aggregation::agg_data::AggregationsSegmentCtx; use crate::aggregation::intermediate_agg_result::{ @@ -16,29 +15,17 @@ use crate::aggregation::segment_agg_result::SegmentAggregationCollector; use crate::aggregation::*; use crate::TantivyError; -#[derive(Clone, Debug, Serialize, Deserialize)] -struct BuildSaltedHasher { - salt: u8, -} - -impl BuildHasher for BuildSaltedHasher { - type Hasher = DefaultHasher; - - fn build_hasher(&self) -> Self::Hasher { - let mut hasher = DefaultHasher::new(); - hasher.write_u8(self.salt); - - hasher - } -} +/// Log2 of the number of registers for the HLL sketch. +/// 2^11 = 2048 registers, giving ~2.3% relative error and ~1KB per sketch (Hll4). +const LG_K: u8 = 11; /// # Cardinality /// /// The cardinality aggregation allows for computing an estimate /// of the number of different values in a data set based on the -/// HyperLogLog++ algorithm. This is particularly useful for understanding the -/// uniqueness of values in a large dataset where counting each unique value -/// individually would be computationally expensive. +/// Apache DataSketches HyperLogLog algorithm. This is particularly useful for +/// understanding the uniqueness of values in a large dataset where counting +/// each unique value individually would be computationally expensive. /// /// For example, you might use a cardinality aggregation to estimate the number /// of unique visitors to a website by aggregating on a field that contains @@ -184,7 +171,7 @@ impl SegmentCardinalityCollectorBucket { term_ids.sort_unstable(); dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| { - self.cardinality.sketch.insert_any(&term); + self.cardinality.insert(term); Ok(()) })?; if has_missing { @@ -195,17 +182,17 @@ impl SegmentCardinalityCollectorBucket { ); match missing_key { Key::Str(missing) => { - self.cardinality.sketch.insert_any(&missing); + self.cardinality.insert(missing.as_str()); } Key::F64(val) => { let val = f64_to_u64(*val); - self.cardinality.sketch.insert_any(&val); + self.cardinality.insert(val); } Key::U64(val) => { - self.cardinality.sketch.insert_any(&val); + self.cardinality.insert(*val); } Key::I64(val) => { - self.cardinality.sketch.insert_any(&val); + self.cardinality.insert(*val); } } } @@ -296,11 +283,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector { })?; for val in col_block_accessor.iter_vals() { let val: u128 = compact_space_accessor.compact_to_u128(val as u32); - bucket.cardinality.sketch.insert_any(&val); + bucket.cardinality.insert(val); } } else { for val in col_block_accessor.iter_vals() { - bucket.cardinality.sketch.insert_any(&val); + bucket.cardinality.insert(val); } } @@ -321,11 +308,18 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -/// The percentiles collector used during segment collection and for merging results. +#[derive(Clone, Debug)] +/// The cardinality collector used during segment collection and for merging results. +/// Uses Apache DataSketches HLL (lg_k=11, Hll4) for compact binary serialization +/// and cross-language compatibility (e.g. Java `datasketches` library). pub struct CardinalityCollector { - sketch: HyperLogLogPlus, + sketch: HllSketch, + /// Salt derived from `ColumnType`, used to differentiate values of different column types + /// that map to the same u64 (e.g. bool `false` = 0 vs i64 `0`). + /// Not serialized — only needed during insertion, not after sketch registers are populated. + salt: u8, } + impl Default for CardinalityCollector { fn default() -> Self { Self::new(0) @@ -338,25 +332,52 @@ impl PartialEq for CardinalityCollector { } } -impl CardinalityCollector { - /// Compute the final cardinality estimate. - pub fn finalize(self) -> Option { - Some(self.sketch.clone().count().trunc()) +impl Serialize for CardinalityCollector { + fn serialize(&self, serializer: S) -> Result { + let bytes = self.sketch.serialize(); + serializer.serialize_bytes(&bytes) } +} +impl<'de> Deserialize<'de> for CardinalityCollector { + fn deserialize>(deserializer: D) -> Result { + let bytes: Vec = Deserialize::deserialize(deserializer)?; + let sketch = HllSketch::deserialize(&bytes).map_err(serde::de::Error::custom)?; + Ok(Self { sketch, salt: 0 }) + } +} + +impl CardinalityCollector { fn new(salt: u8) -> Self { Self { - sketch: HyperLogLogPlus::new(16, BuildSaltedHasher { salt }).unwrap(), + sketch: HllSketch::new(LG_K, HllType::Hll4), + salt, } } - pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> { - self.sketch.merge(&right.sketch).map_err(|err| { - TantivyError::AggregationError(AggregationError::InternalError(format!( - "Error while merging cardinality {err:?}" - ))) - })?; + /// Insert a value into the HLL sketch, salted by the column type. + /// The salt ensures that identical u64 values from different column types + /// (e.g. bool `false` vs i64 `0`) are counted as distinct. + pub(crate) fn insert(&mut self, value: T) { + self.sketch.update((self.salt, value)); + } + /// Compute the final cardinality estimate. + pub fn finalize(self) -> Option { + Some(self.sketch.estimate().trunc()) + } + + /// Serialize the HLL sketch to its compact binary representation. + /// The format is cross-language compatible with Apache DataSketches (Java, C++, Python). + pub fn to_sketch_bytes(&self) -> Vec { + self.sketch.serialize() + } + + pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> { + let mut union = HllUnion::new(LG_K); + union.update(&self.sketch); + union.update(&right.sketch); + self.sketch = union.get_result(HllType::Hll4); Ok(()) } } @@ -518,4 +539,75 @@ mod tests { Ok(()) } + + #[test] + fn cardinality_collector_serde_roundtrip() { + use super::CardinalityCollector; + + let mut collector = CardinalityCollector::default(); + collector.insert("hello"); + collector.insert("world"); + collector.insert("hello"); // duplicate + + let serialized = serde_json::to_vec(&collector).unwrap(); + let deserialized: CardinalityCollector = serde_json::from_slice(&serialized).unwrap(); + + let original_estimate = collector.finalize().unwrap(); + let roundtrip_estimate = deserialized.finalize().unwrap(); + assert_eq!(original_estimate, roundtrip_estimate); + assert_eq!(original_estimate, 2.0); + } + + #[test] + fn cardinality_collector_merge() { + use super::CardinalityCollector; + + let mut left = CardinalityCollector::default(); + left.insert("a"); + left.insert("b"); + + let mut right = CardinalityCollector::default(); + right.insert("b"); + right.insert("c"); + + left.merge_fruits(right).unwrap(); + let estimate = left.finalize().unwrap(); + assert_eq!(estimate, 3.0); + } + + #[test] + fn cardinality_collector_serialize_deserialize_binary() { + use datasketches::hll::HllSketch; + + use super::CardinalityCollector; + + let mut collector = CardinalityCollector::default(); + collector.insert("apple"); + collector.insert("banana"); + collector.insert("cherry"); + + let bytes = collector.to_sketch_bytes(); + let deserialized = HllSketch::deserialize(&bytes).unwrap(); + assert!((deserialized.estimate() - 3.0).abs() < 0.01); + } + + #[test] + fn cardinality_collector_salt_differentiates_types() { + use super::CardinalityCollector; + + // Without salt, same u64 value from different column types would collide + let mut collector_bool = CardinalityCollector::new(5); // e.g. ColumnType::Bool + collector_bool.insert(0u64); // false + collector_bool.insert(1u64); // true + + let mut collector_i64 = CardinalityCollector::new(2); // e.g. ColumnType::I64 + collector_i64.insert(0u64); + collector_i64.insert(1u64); + + // Merge them + collector_bool.merge_fruits(collector_i64).unwrap(); + let estimate = collector_bool.finalize().unwrap(); + // Should be 4 because salt makes (5, 0) != (2, 0) and (5, 1) != (2, 1) + assert_eq!(estimate, 4.0); + } }