From cdd24b7ee543fcc78d23cca46825e17acc598496 Mon Sep 17 00:00:00 2001 From: "cong.xie" Date: Wed, 11 Feb 2026 08:49:46 -0500 Subject: [PATCH 1/3] Replace hyperloglogplus with Apache DataSketches HLL (lg_k=11) Switch tantivy's cardinality aggregation from the hyperloglogplus crate (HyperLogLog++ with p=16) to the official Apache DataSketches HLL implementation (datasketches crate v0.2.0 with lg_k=11, Hll4). This enables returning raw HLL sketch bytes from pomsky to Datadog's event query, where they can be properly deserialized and merged using the same DataSketches library (Java). The previous implementation required pomsky to fabricate fake HLL sketches from scalar cardinality estimates, which produced incorrect results when merged. Changes: - Cargo.toml: hyperloglogplus 0.4.1 -> datasketches 0.2.0 - CardinalityCollector: HyperLogLogPlus -> HllSketch - Custom Serde impl using HllSketch binary format (cross-shard compat) - New to_sketch_bytes() for external consumers (pomsky) - Salt preserved via (salt, value) tuple hashing for column type disambiguation - Removed BuildSaltedHasher struct - Added 4 new unit tests (serde roundtrip, merge, binary compat, salt) --- Cargo.toml | 2 +- src/aggregation/metric/cardinality.rs | 177 +++++++++++++++++++------- 2 files changed, 135 insertions(+), 44 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ead328b92..27fcb1b59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.9", path = "./bitpacker" } common = { version = "0.10", path = "./common/", package = "tantivy-common" } tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" } sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] } -hyperloglogplus = { version = "0.4.1", features = ["const-loop"] } +datasketches = "0.2.0" futures-util = { version = "0.3.28", optional = true } futures-channel = { version = "0.3.28", optional = true } fnv = "1.0.7" diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index c184848d8..80106654c 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -1,12 +1,11 @@ -use std::collections::hash_map::DefaultHasher; -use std::hash::{BuildHasher, Hasher}; +use std::hash::Hash; use columnar::column_values::CompactSpaceU64Accessor; use columnar::{Column, ColumnType, Dictionary, StrColumn}; use common::f64_to_u64; -use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; +use datasketches::hll::{HllSketch, HllType, HllUnion}; use rustc_hash::FxHashSet; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::aggregation::agg_data::AggregationsSegmentCtx; use crate::aggregation::intermediate_agg_result::{ @@ -16,29 +15,17 @@ use crate::aggregation::segment_agg_result::SegmentAggregationCollector; use crate::aggregation::*; use crate::TantivyError; -#[derive(Clone, Debug, Serialize, Deserialize)] -struct BuildSaltedHasher { - salt: u8, -} - -impl BuildHasher for BuildSaltedHasher { - type Hasher = DefaultHasher; - - fn build_hasher(&self) -> Self::Hasher { - let mut hasher = DefaultHasher::new(); - hasher.write_u8(self.salt); - - hasher - } -} +/// Log2 of the number of registers. Must match the Java `Union(LOG2M)` where LOG2M=11. +/// 2^11 = 2048 registers. +const LG_K: u8 = 11; /// # Cardinality /// /// The cardinality aggregation allows for computing an estimate /// of the number of different values in a data set based on the -/// HyperLogLog++ algorithm. This is particularly useful for understanding the -/// uniqueness of values in a large dataset where counting each unique value -/// individually would be computationally expensive. +/// Apache DataSketches HyperLogLog algorithm. This is particularly useful for +/// understanding the uniqueness of values in a large dataset where counting +/// each unique value individually would be computationally expensive. /// /// For example, you might use a cardinality aggregation to estimate the number /// of unique visitors to a website by aggregating on a field that contains @@ -184,7 +171,7 @@ impl SegmentCardinalityCollectorBucket { term_ids.sort_unstable(); dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| { - self.cardinality.sketch.insert_any(&term); + self.cardinality.insert(term); Ok(()) })?; if has_missing { @@ -195,17 +182,17 @@ impl SegmentCardinalityCollectorBucket { ); match missing_key { Key::Str(missing) => { - self.cardinality.sketch.insert_any(&missing); + self.cardinality.insert(missing.as_str()); } Key::F64(val) => { let val = f64_to_u64(*val); - self.cardinality.sketch.insert_any(&val); + self.cardinality.insert(val); } Key::U64(val) => { - self.cardinality.sketch.insert_any(&val); + self.cardinality.insert(*val); } Key::I64(val) => { - self.cardinality.sketch.insert_any(&val); + self.cardinality.insert(*val); } } } @@ -296,11 +283,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector { })?; for val in col_block_accessor.iter_vals() { let val: u128 = compact_space_accessor.compact_to_u128(val as u32); - bucket.cardinality.sketch.insert_any(&val); + bucket.cardinality.insert(val); } } else { for val in col_block_accessor.iter_vals() { - bucket.cardinality.sketch.insert_any(&val); + bucket.cardinality.insert(val); } } @@ -321,11 +308,17 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -/// The percentiles collector used during segment collection and for merging results. +#[derive(Clone, Debug)] +/// The cardinality collector used during segment collection and for merging results. +/// Uses Apache DataSketches HLL (lg_k=11) for compatibility with Datadog's event query. pub struct CardinalityCollector { - sketch: HyperLogLogPlus, + sketch: HllSketch, + /// Salt derived from `ColumnType`, used to differentiate values of different column types + /// that map to the same u64 (e.g. bool `false` = 0 vs i64 `0`). + /// Not serialized — only needed during insertion, not after sketch registers are populated. + salt: u8, } + impl Default for CardinalityCollector { fn default() -> Self { Self::new(0) @@ -338,25 +331,53 @@ impl PartialEq for CardinalityCollector { } } -impl CardinalityCollector { - /// Compute the final cardinality estimate. - pub fn finalize(self) -> Option { - Some(self.sketch.clone().count().trunc()) +impl Serialize for CardinalityCollector { + fn serialize(&self, serializer: S) -> Result { + let bytes = self.sketch.serialize(); + serializer.serialize_bytes(&bytes) } +} +impl<'de> Deserialize<'de> for CardinalityCollector { + fn deserialize>(deserializer: D) -> Result { + let bytes: Vec = Deserialize::deserialize(deserializer)?; + let sketch = + HllSketch::deserialize(&bytes).map_err(serde::de::Error::custom)?; + Ok(Self { sketch, salt: 0 }) + } +} + +impl CardinalityCollector { fn new(salt: u8) -> Self { Self { - sketch: HyperLogLogPlus::new(16, BuildSaltedHasher { salt }).unwrap(), + sketch: HllSketch::new(LG_K, HllType::Hll4), + salt, } } - pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> { - self.sketch.merge(&right.sketch).map_err(|err| { - TantivyError::AggregationError(AggregationError::InternalError(format!( - "Error while merging cardinality {err:?}" - ))) - })?; + /// Insert a value into the HLL sketch, salted by the column type. + /// The salt ensures that identical u64 values from different column types + /// (e.g. bool `false` vs i64 `0`) are counted as distinct. + pub(crate) fn insert(&mut self, value: T) { + self.sketch.update((self.salt, value)); + } + /// Compute the final cardinality estimate. + pub fn finalize(self) -> Option { + Some(self.sketch.estimate().trunc()) + } + + /// Serialize the HLL sketch to its compact binary representation. + /// This format is compatible with Apache DataSketches Java (`HllSketch.heapify()`). + pub fn to_sketch_bytes(&self) -> Vec { + self.sketch.serialize() + } + + pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> { + let mut union = HllUnion::new(LG_K); + union.update(&self.sketch); + union.update(&right.sketch); + self.sketch = union.get_result(HllType::Hll4); Ok(()) } } @@ -518,4 +539,74 @@ mod tests { Ok(()) } + + #[test] + fn cardinality_collector_serde_roundtrip() { + use super::CardinalityCollector; + + let mut collector = CardinalityCollector::default(); + collector.insert("hello"); + collector.insert("world"); + collector.insert("hello"); // duplicate + + let serialized = serde_json::to_vec(&collector).unwrap(); + let deserialized: CardinalityCollector = serde_json::from_slice(&serialized).unwrap(); + + let original_estimate = collector.finalize().unwrap(); + let roundtrip_estimate = deserialized.finalize().unwrap(); + assert_eq!(original_estimate, roundtrip_estimate); + assert_eq!(original_estimate, 2.0); + } + + #[test] + fn cardinality_collector_merge() { + use super::CardinalityCollector; + + let mut left = CardinalityCollector::default(); + left.insert("a"); + left.insert("b"); + + let mut right = CardinalityCollector::default(); + right.insert("b"); + right.insert("c"); + + left.merge_fruits(right).unwrap(); + let estimate = left.finalize().unwrap(); + assert_eq!(estimate, 3.0); + } + + #[test] + fn cardinality_collector_serialize_deserialize_binary() { + use super::CardinalityCollector; + use datasketches::hll::HllSketch; + + let mut collector = CardinalityCollector::default(); + collector.insert("apple"); + collector.insert("banana"); + collector.insert("cherry"); + + let bytes = collector.to_sketch_bytes(); + let deserialized = HllSketch::deserialize(&bytes).unwrap(); + assert!((deserialized.estimate() - 3.0).abs() < 0.01); + } + + #[test] + fn cardinality_collector_salt_differentiates_types() { + use super::CardinalityCollector; + + // Without salt, same u64 value from different column types would collide + let mut collector_bool = CardinalityCollector::new(5); // e.g. ColumnType::Bool + collector_bool.insert(0u64); // false + collector_bool.insert(1u64); // true + + let mut collector_i64 = CardinalityCollector::new(2); // e.g. ColumnType::I64 + collector_i64.insert(0u64); + collector_i64.insert(1u64); + + // Merge them + collector_bool.merge_fruits(collector_i64).unwrap(); + let estimate = collector_bool.finalize().unwrap(); + // Should be 4 because salt makes (5, 0) != (2, 0) and (5, 1) != (2, 1) + assert_eq!(estimate, 4.0); + } } From 698f073f880e36f6c09130f7e6f74ddb56f01bf8 Mon Sep 17 00:00:00 2001 From: "cong.xie" Date: Wed, 11 Feb 2026 15:52:39 -0500 Subject: [PATCH 2/3] fix fmt --- src/aggregation/metric/cardinality.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index 80106654c..efc634f7c 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -341,8 +341,7 @@ impl Serialize for CardinalityCollector { impl<'de> Deserialize<'de> for CardinalityCollector { fn deserialize>(deserializer: D) -> Result { let bytes: Vec = Deserialize::deserialize(deserializer)?; - let sketch = - HllSketch::deserialize(&bytes).map_err(serde::de::Error::custom)?; + let sketch = HllSketch::deserialize(&bytes).map_err(serde::de::Error::custom)?; Ok(Self { sketch, salt: 0 }) } } @@ -577,9 +576,10 @@ mod tests { #[test] fn cardinality_collector_serialize_deserialize_binary() { - use super::CardinalityCollector; use datasketches::hll::HllSketch; + use super::CardinalityCollector; + let mut collector = CardinalityCollector::default(); collector.insert("apple"); collector.insert("banana"); From 7eca33143e199eb357f6be6b11b8089a9b0fafb2 Mon Sep 17 00:00:00 2001 From: "cong.xie" Date: Thu, 12 Feb 2026 11:44:42 -0500 Subject: [PATCH 3/3] Remove Datadog-specific references from comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is an open-source repo — replace references to Datadog's event query with generic cross-language compatibility descriptions. --- src/aggregation/metric/cardinality.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index efc634f7c..d1cb234da 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -15,8 +15,8 @@ use crate::aggregation::segment_agg_result::SegmentAggregationCollector; use crate::aggregation::*; use crate::TantivyError; -/// Log2 of the number of registers. Must match the Java `Union(LOG2M)` where LOG2M=11. -/// 2^11 = 2048 registers. +/// Log2 of the number of registers for the HLL sketch. +/// 2^11 = 2048 registers, giving ~2.3% relative error and ~1KB per sketch (Hll4). const LG_K: u8 = 11; /// # Cardinality @@ -310,7 +310,8 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector { #[derive(Clone, Debug)] /// The cardinality collector used during segment collection and for merging results. -/// Uses Apache DataSketches HLL (lg_k=11) for compatibility with Datadog's event query. +/// Uses Apache DataSketches HLL (lg_k=11, Hll4) for compact binary serialization +/// and cross-language compatibility (e.g. Java `datasketches` library). pub struct CardinalityCollector { sketch: HllSketch, /// Salt derived from `ColumnType`, used to differentiate values of different column types @@ -367,7 +368,7 @@ impl CardinalityCollector { } /// Serialize the HLL sketch to its compact binary representation. - /// This format is compatible with Apache DataSketches Java (`HllSketch.heapify()`). + /// The format is cross-language compatible with Apache DataSketches (Java, C++, Python). pub fn to_sketch_bytes(&self) -> Vec { self.sketch.serialize() }