Merge pull request #2837 from quickwit-oss/congxie/replaceHll

Replace hyperloglogplus with Apache DataSketches HLL (lg_k=11)
This commit is contained in:
Adrien Guillo
2026-02-12 17:19:40 -05:00
committed by GitHub
2 changed files with 136 additions and 44 deletions

View File

@@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
datasketches = "0.2.0"
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"

View File

@@ -1,12 +1,11 @@
use std::collections::hash_map::DefaultHasher;
use std::hash::{BuildHasher, Hasher};
use std::hash::Hash;
use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{Column, ColumnType, Dictionary, StrColumn};
use common::f64_to_u64;
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use datasketches::hll::{HllSketch, HllType, HllUnion};
use rustc_hash::FxHashSet;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::aggregation::agg_data::AggregationsSegmentCtx;
use crate::aggregation::intermediate_agg_result::{
@@ -16,29 +15,17 @@ use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
use crate::aggregation::*;
use crate::TantivyError;
#[derive(Clone, Debug, Serialize, Deserialize)]
struct BuildSaltedHasher {
salt: u8,
}
impl BuildHasher for BuildSaltedHasher {
type Hasher = DefaultHasher;
fn build_hasher(&self) -> Self::Hasher {
let mut hasher = DefaultHasher::new();
hasher.write_u8(self.salt);
hasher
}
}
/// Log2 of the number of registers for the HLL sketch.
/// 2^11 = 2048 registers, giving ~2.3% relative error and ~1KB per sketch (Hll4).
const LG_K: u8 = 11;
/// # Cardinality
///
/// The cardinality aggregation allows for computing an estimate
/// of the number of different values in a data set based on the
/// HyperLogLog++ algorithm. This is particularly useful for understanding the
/// uniqueness of values in a large dataset where counting each unique value
/// individually would be computationally expensive.
/// Apache DataSketches HyperLogLog algorithm. This is particularly useful for
/// understanding the uniqueness of values in a large dataset where counting
/// each unique value individually would be computationally expensive.
///
/// For example, you might use a cardinality aggregation to estimate the number
/// of unique visitors to a website by aggregating on a field that contains
@@ -184,7 +171,7 @@ impl SegmentCardinalityCollectorBucket {
term_ids.sort_unstable();
dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| {
self.cardinality.sketch.insert_any(&term);
self.cardinality.insert(term);
Ok(())
})?;
if has_missing {
@@ -195,17 +182,17 @@ impl SegmentCardinalityCollectorBucket {
);
match missing_key {
Key::Str(missing) => {
self.cardinality.sketch.insert_any(&missing);
self.cardinality.insert(missing.as_str());
}
Key::F64(val) => {
let val = f64_to_u64(*val);
self.cardinality.sketch.insert_any(&val);
self.cardinality.insert(val);
}
Key::U64(val) => {
self.cardinality.sketch.insert_any(&val);
self.cardinality.insert(*val);
}
Key::I64(val) => {
self.cardinality.sketch.insert_any(&val);
self.cardinality.insert(*val);
}
}
}
@@ -296,11 +283,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
})?;
for val in col_block_accessor.iter_vals() {
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
bucket.cardinality.sketch.insert_any(&val);
bucket.cardinality.insert(val);
}
} else {
for val in col_block_accessor.iter_vals() {
bucket.cardinality.sketch.insert_any(&val);
bucket.cardinality.insert(val);
}
}
@@ -321,11 +308,18 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
/// The percentiles collector used during segment collection and for merging results.
#[derive(Clone, Debug)]
/// The cardinality collector used during segment collection and for merging results.
/// Uses Apache DataSketches HLL (lg_k=11, Hll4) for compact binary serialization
/// and cross-language compatibility (e.g. Java `datasketches` library).
pub struct CardinalityCollector {
sketch: HyperLogLogPlus<u64, BuildSaltedHasher>,
sketch: HllSketch,
/// Salt derived from `ColumnType`, used to differentiate values of different column types
/// that map to the same u64 (e.g. bool `false` = 0 vs i64 `0`).
/// Not serialized — only needed during insertion, not after sketch registers are populated.
salt: u8,
}
impl Default for CardinalityCollector {
fn default() -> Self {
Self::new(0)
@@ -338,25 +332,52 @@ impl PartialEq for CardinalityCollector {
}
}
impl CardinalityCollector {
/// Compute the final cardinality estimate.
pub fn finalize(self) -> Option<f64> {
Some(self.sketch.clone().count().trunc())
impl Serialize for CardinalityCollector {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let bytes = self.sketch.serialize();
serializer.serialize_bytes(&bytes)
}
}
impl<'de> Deserialize<'de> for CardinalityCollector {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let bytes: Vec<u8> = Deserialize::deserialize(deserializer)?;
let sketch = HllSketch::deserialize(&bytes).map_err(serde::de::Error::custom)?;
Ok(Self { sketch, salt: 0 })
}
}
impl CardinalityCollector {
fn new(salt: u8) -> Self {
Self {
sketch: HyperLogLogPlus::new(16, BuildSaltedHasher { salt }).unwrap(),
sketch: HllSketch::new(LG_K, HllType::Hll4),
salt,
}
}
pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> {
self.sketch.merge(&right.sketch).map_err(|err| {
TantivyError::AggregationError(AggregationError::InternalError(format!(
"Error while merging cardinality {err:?}"
)))
})?;
/// Insert a value into the HLL sketch, salted by the column type.
/// The salt ensures that identical u64 values from different column types
/// (e.g. bool `false` vs i64 `0`) are counted as distinct.
pub(crate) fn insert<T: Hash>(&mut self, value: T) {
self.sketch.update((self.salt, value));
}
/// Compute the final cardinality estimate.
pub fn finalize(self) -> Option<f64> {
Some(self.sketch.estimate().trunc())
}
/// Serialize the HLL sketch to its compact binary representation.
/// The format is cross-language compatible with Apache DataSketches (Java, C++, Python).
pub fn to_sketch_bytes(&self) -> Vec<u8> {
self.sketch.serialize()
}
pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> {
let mut union = HllUnion::new(LG_K);
union.update(&self.sketch);
union.update(&right.sketch);
self.sketch = union.get_result(HllType::Hll4);
Ok(())
}
}
@@ -518,4 +539,75 @@ mod tests {
Ok(())
}
#[test]
fn cardinality_collector_serde_roundtrip() {
use super::CardinalityCollector;
let mut collector = CardinalityCollector::default();
collector.insert("hello");
collector.insert("world");
collector.insert("hello"); // duplicate
let serialized = serde_json::to_vec(&collector).unwrap();
let deserialized: CardinalityCollector = serde_json::from_slice(&serialized).unwrap();
let original_estimate = collector.finalize().unwrap();
let roundtrip_estimate = deserialized.finalize().unwrap();
assert_eq!(original_estimate, roundtrip_estimate);
assert_eq!(original_estimate, 2.0);
}
#[test]
fn cardinality_collector_merge() {
use super::CardinalityCollector;
let mut left = CardinalityCollector::default();
left.insert("a");
left.insert("b");
let mut right = CardinalityCollector::default();
right.insert("b");
right.insert("c");
left.merge_fruits(right).unwrap();
let estimate = left.finalize().unwrap();
assert_eq!(estimate, 3.0);
}
#[test]
fn cardinality_collector_serialize_deserialize_binary() {
use datasketches::hll::HllSketch;
use super::CardinalityCollector;
let mut collector = CardinalityCollector::default();
collector.insert("apple");
collector.insert("banana");
collector.insert("cherry");
let bytes = collector.to_sketch_bytes();
let deserialized = HllSketch::deserialize(&bytes).unwrap();
assert!((deserialized.estimate() - 3.0).abs() < 0.01);
}
#[test]
fn cardinality_collector_salt_differentiates_types() {
use super::CardinalityCollector;
// Without salt, same u64 value from different column types would collide
let mut collector_bool = CardinalityCollector::new(5); // e.g. ColumnType::Bool
collector_bool.insert(0u64); // false
collector_bool.insert(1u64); // true
let mut collector_i64 = CardinalityCollector::new(2); // e.g. ColumnType::I64
collector_i64.insert(0u64);
collector_i64.insert(1u64);
// Merge them
collector_bool.merge_fruits(collector_i64).unwrap();
let estimate = collector_bool.finalize().unwrap();
// Should be 4 because salt makes (5, 0) != (2, 0) and (5, 1) != (2, 1)
assert_eq!(estimate, 4.0);
}
}