|
|
|
|
@@ -4,6 +4,7 @@ use std::io;
|
|
|
|
|
|
|
|
|
|
use columnar::column_values::CompactSpaceU64Accessor;
|
|
|
|
|
use columnar::{Column, ColumnType, Dictionary, StrColumn};
|
|
|
|
|
use common::{BitSet, TinySet};
|
|
|
|
|
use datasketches::hll::{Coupon, HllSketch, HllType, HllUnion};
|
|
|
|
|
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
|
|
|
|
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
|
|
|
|
@@ -20,6 +21,12 @@ use crate::TantivyError;
|
|
|
|
|
/// 2^11 = 2048 registers, giving ~2.3% relative error and ~1KB per sketch (Hll4).
|
|
|
|
|
const LG_K: u8 = 11;
|
|
|
|
|
|
|
|
|
|
/// Promote FxHashSet<u64> -> PagedBitset at ~3% density (`len * 32 >
|
|
|
|
|
/// dict_num_terms`). Past this point the bitset (~`dict_num_terms / 7.5`
|
|
|
|
|
/// bytes) is smaller than the hashset (~10 B/entry minimum) and avoids
|
|
|
|
|
/// the per-insert hash.
|
|
|
|
|
const PROMOTION_RATIO: u64 = 32;
|
|
|
|
|
|
|
|
|
|
/// # Cardinality
|
|
|
|
|
///
|
|
|
|
|
/// The cardinality aggregation allows for computing an estimate
|
|
|
|
|
@@ -177,9 +184,263 @@ impl CouponCache {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) struct SegmentCardinalityCollector {
|
|
|
|
|
// =================================================================
|
|
|
|
|
// PagedBitset: a sparse bitset indexed by term_ord.
|
|
|
|
|
//
|
|
|
|
|
// Used as the dense alternative to FxHashSet<u64> once a string
|
|
|
|
|
// cardinality bucket has accumulated enough unique term ordinals.
|
|
|
|
|
// Memory is bounded to (touched pages) * (page bytes), not
|
|
|
|
|
// (max_term_ord / 8).
|
|
|
|
|
//
|
|
|
|
|
// Page geometry mirrors `PagedTermMap` in `term_agg.rs`: 1024 ords
|
|
|
|
|
// per page, lazy `Vec<Option<Box<Page>>>` directory.
|
|
|
|
|
// =================================================================
|
|
|
|
|
const BITSET_PAGE_SHIFT: u32 = 10;
|
|
|
|
|
const BITSET_PAGE_BITS: u64 = 1u64 << BITSET_PAGE_SHIFT; // 1024
|
|
|
|
|
const BITSET_PAGE_MASK: u64 = BITSET_PAGE_BITS - 1;
|
|
|
|
|
const BITSET_WORDS_PER_PAGE: usize = (BITSET_PAGE_BITS / 64) as usize; // 16
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct PagedBitsetPage {
|
|
|
|
|
words: [TinySet; BITSET_WORDS_PER_PAGE],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl PagedBitsetPage {
|
|
|
|
|
fn new() -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
words: [TinySet::empty(); BITSET_WORDS_PER_PAGE],
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) struct PagedBitset {
|
|
|
|
|
pages: Vec<Option<Box<PagedBitsetPage>>>,
|
|
|
|
|
/// Cached number of set bits, maintained on insert.
|
|
|
|
|
count: u64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl PagedBitset {
|
|
|
|
|
/// Allocates a directory big enough to hold ords up to and including
|
|
|
|
|
/// `max_term_ord`. Pages are allocated lazily on first set.
|
|
|
|
|
fn with_max_term_ord(max_term_ord: u64) -> Self {
|
|
|
|
|
let max_page_idx = (max_term_ord >> BITSET_PAGE_SHIFT) as usize;
|
|
|
|
|
let num_pages = max_page_idx + 1;
|
|
|
|
|
Self {
|
|
|
|
|
pages: vec![None; num_pages],
|
|
|
|
|
count: 0,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
fn insert(&mut self, term_ord: u64) {
|
|
|
|
|
let page_idx = (term_ord >> BITSET_PAGE_SHIFT) as usize;
|
|
|
|
|
let intra = term_ord & BITSET_PAGE_MASK;
|
|
|
|
|
let word_idx = (intra >> 6) as usize;
|
|
|
|
|
let bit_idx = (intra & 63) as u32;
|
|
|
|
|
|
|
|
|
|
let page = match &mut self.pages[page_idx] {
|
|
|
|
|
Some(p) => p,
|
|
|
|
|
None => {
|
|
|
|
|
self.pages[page_idx] = Some(Box::new(PagedBitsetPage::new()));
|
|
|
|
|
self.pages[page_idx].as_mut().unwrap()
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
if page.words[word_idx].insert_mut(bit_idx) {
|
|
|
|
|
self.count += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Number of set bits. O(1).
|
|
|
|
|
#[inline]
|
|
|
|
|
fn len(&self) -> u64 {
|
|
|
|
|
self.count
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Iterate set ords in ascending order.
|
|
|
|
|
fn iter_sorted(&self) -> impl Iterator<Item = u64> + '_ {
|
|
|
|
|
self.pages
|
|
|
|
|
.iter()
|
|
|
|
|
.enumerate()
|
|
|
|
|
.filter_map(|(page_idx, page_opt)| page_opt.as_ref().map(|p| (page_idx, p)))
|
|
|
|
|
.flat_map(|(page_idx, page)| {
|
|
|
|
|
let page_base_ord = (page_idx as u64) << BITSET_PAGE_SHIFT;
|
|
|
|
|
page.words
|
|
|
|
|
.iter()
|
|
|
|
|
.enumerate()
|
|
|
|
|
.flat_map(move |(word_idx, &word)| {
|
|
|
|
|
let word_base_ord = page_base_ord + (word_idx as u64) * 64;
|
|
|
|
|
word.into_iter()
|
|
|
|
|
.map(move |bit| word_base_ord + u64::from(bit))
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Threshold below which we use `BitSet` instead of `TermOrdSet`.
|
|
|
|
|
///
|
|
|
|
|
/// Both `BitSet` and `FxHashSet<u64>` have the same 32-byte struct, so the comparison is heap only:
|
|
|
|
|
/// * `BitSet` at T=256: 5 `TinySet` words covering 258 bits (with the missing-value sentinel) =
|
|
|
|
|
/// 40 bytes.
|
|
|
|
|
/// * `FxHashSet<u64>` after one insert: 4-bucket hashbrown table ≈ 56 bytes
|
|
|
|
|
pub(crate) const BITSET_MAX_TERM_ORD: u64 = 256;
|
|
|
|
|
|
|
|
|
|
// =================================================================
|
|
|
|
|
// TermOrdAccumulator: per-bucket abstraction over the entries set.
|
|
|
|
|
//
|
|
|
|
|
// Implementations:
|
|
|
|
|
// - `BitSet` (from `common`): used when `column.max_value()` is small (< BITSET_MAX_TERM_ORD).
|
|
|
|
|
// Pre-allocated, no promotion.
|
|
|
|
|
// - `TermOrdSet`: adaptive, starts as FxHashSet and promotes to a paged bitset when occupancy
|
|
|
|
|
// crosses the density threshold (only if promotion is enabled — typically gated on top-level
|
|
|
|
|
// aggregation).
|
|
|
|
|
//
|
|
|
|
|
// The trait lets `SegmentCardinalityCollector` be generic over the choice
|
|
|
|
|
// so the hot collect() loop monomorphizes to a direct call (no enum
|
|
|
|
|
// dispatch per insert).
|
|
|
|
|
// =================================================================
|
|
|
|
|
pub(crate) trait TermOrdAccumulator: Sized {
|
|
|
|
|
/// Construct an empty accumulator.
|
|
|
|
|
/// `max_term_ord_inclusive` is the largest term_ord that may be
|
|
|
|
|
/// inserted (used to size pre-allocated bitsets and the dense bitset
|
|
|
|
|
/// on promotion).
|
|
|
|
|
fn new(max_term_ord_inclusive: u64) -> Self;
|
|
|
|
|
fn insert(&mut self, term_ord: u64);
|
|
|
|
|
/// Bulk insert. Implementations may override to hoist any inner
|
|
|
|
|
/// dispatch outside the loop. Default loops `insert`.
|
|
|
|
|
#[inline]
|
|
|
|
|
fn extend_from_iter<I: IntoIterator<Item = u64>>(&mut self, ords: I) {
|
|
|
|
|
for ord in ords {
|
|
|
|
|
self.insert(ord);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
/// Hook called once per ingested block. Adaptive impls use this to
|
|
|
|
|
/// decide on sparse->dense promotion.
|
|
|
|
|
fn maybe_compact(&mut self) {}
|
|
|
|
|
fn len(&self) -> usize;
|
|
|
|
|
fn iter_ords(&self) -> impl Iterator<Item = u64> + '_;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl TermOrdAccumulator for BitSet {
|
|
|
|
|
#[inline]
|
|
|
|
|
fn new(max_term_ord_inclusive: u64) -> Self {
|
|
|
|
|
// `BitSet::with_max_value(M)` accepts ords in [0, M).
|
|
|
|
|
// We need ords up to and including `max_term_ord_inclusive`, plus
|
|
|
|
|
// the missing-value sentinel `column.max_value() + 1`.
|
|
|
|
|
BitSet::with_max_value((max_term_ord_inclusive + 2) as u32)
|
|
|
|
|
}
|
|
|
|
|
#[inline]
|
|
|
|
|
fn insert(&mut self, term_ord: u64) {
|
|
|
|
|
BitSet::insert(self, term_ord as u32);
|
|
|
|
|
}
|
|
|
|
|
#[inline]
|
|
|
|
|
fn len(&self) -> usize {
|
|
|
|
|
BitSet::len(self)
|
|
|
|
|
}
|
|
|
|
|
fn iter_ords(&self) -> impl Iterator<Item = u64> + '_ {
|
|
|
|
|
// `BitSet` itself doesn't expose iteration, but
|
|
|
|
|
// `BitSet::tinyset(bucket)` does. Walk per-bucket and yield each
|
|
|
|
|
// set bit. The capacity is `max_value()`; iterating to
|
|
|
|
|
// `div_ceil(64)` covers every possible ord exactly once.
|
|
|
|
|
let num_buckets = self.max_value().div_ceil(64);
|
|
|
|
|
(0..num_buckets).flat_map(move |bucket| {
|
|
|
|
|
let chunk_base = u64::from(bucket) * 64;
|
|
|
|
|
self.tinyset(bucket)
|
|
|
|
|
.into_iter()
|
|
|
|
|
.map(move |bit| chunk_base + u64::from(bit))
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// =================================================================
|
|
|
|
|
// TermOrdSet: adaptive sparse->dense accumulator.
|
|
|
|
|
//
|
|
|
|
|
// Starts as an FxHashSet (cheap when few ords are seen). When occupancy
|
|
|
|
|
// crosses `len * PROMOTION_RATIO > max_term_ord_inclusive`, drains into
|
|
|
|
|
// a `PagedBitset` and continues dense. Promotion is one-way.
|
|
|
|
|
// =================================================================
|
|
|
|
|
pub(crate) struct TermOrdSet {
|
|
|
|
|
inner: TermOrdSetInner,
|
|
|
|
|
/// Largest term_ord that may be inserted. Used for both sizing the
|
|
|
|
|
/// dense bitset on promotion and as the promotion-threshold reference.
|
|
|
|
|
max_term_ord_inclusive: u64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enum TermOrdSetInner {
|
|
|
|
|
Sparse(FxHashSet<u64>),
|
|
|
|
|
Dense(PagedBitset),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl TermOrdAccumulator for TermOrdSet {
|
|
|
|
|
fn new(max_term_ord_inclusive: u64) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
inner: TermOrdSetInner::Sparse(FxHashSet::default()),
|
|
|
|
|
max_term_ord_inclusive,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
fn insert(&mut self, term_ord: u64) {
|
|
|
|
|
match &mut self.inner {
|
|
|
|
|
TermOrdSetInner::Sparse(set) => {
|
|
|
|
|
set.insert(term_ord);
|
|
|
|
|
}
|
|
|
|
|
TermOrdSetInner::Dense(bitset) => bitset.insert(term_ord),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Hoist the Sparse/Dense match outside the per-ord loop so that a
|
|
|
|
|
/// block of inserts dispatches once.
|
|
|
|
|
fn extend_from_iter<I: IntoIterator<Item = u64>>(&mut self, ords: I) {
|
|
|
|
|
match &mut self.inner {
|
|
|
|
|
TermOrdSetInner::Sparse(set) => {
|
|
|
|
|
for ord in ords {
|
|
|
|
|
set.insert(ord);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
TermOrdSetInner::Dense(bitset) => {
|
|
|
|
|
for ord in ords {
|
|
|
|
|
bitset.insert(ord);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn maybe_compact(&mut self) {
|
|
|
|
|
let TermOrdSetInner::Sparse(set) = &mut self.inner else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
if set.len() as u64 * PROMOTION_RATIO <= self.max_term_ord_inclusive {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// Size for ord <= max_term_ord_inclusive plus the missing sentinel
|
|
|
|
|
// (column.max_value() + 1, which may equal max_term_ord_inclusive
|
|
|
|
|
// when the column references every dictionary term).
|
|
|
|
|
let mut bitset = PagedBitset::with_max_term_ord(self.max_term_ord_inclusive + 1);
|
|
|
|
|
let set = std::mem::take(set);
|
|
|
|
|
for ord in set {
|
|
|
|
|
bitset.insert(ord);
|
|
|
|
|
}
|
|
|
|
|
self.inner = TermOrdSetInner::Dense(bitset);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn len(&self) -> usize {
|
|
|
|
|
match &self.inner {
|
|
|
|
|
TermOrdSetInner::Sparse(set) => set.len(),
|
|
|
|
|
TermOrdSetInner::Dense(bitset) => bitset.len() as usize,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn iter_ords(&self) -> impl Iterator<Item = u64> + '_ {
|
|
|
|
|
match &self.inner {
|
|
|
|
|
TermOrdSetInner::Sparse(set) => itertools::Either::Left(set.iter().copied()),
|
|
|
|
|
TermOrdSetInner::Dense(bitset) => itertools::Either::Right(bitset.iter_sorted()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) struct SegmentCardinalityCollector<S: TermOrdAccumulator> {
|
|
|
|
|
/// Buckets are Some(_) until they get consumed by into_intermediate_results().
|
|
|
|
|
buckets: Vec<Option<SegmentCardinalityCollectorBucket>>,
|
|
|
|
|
buckets: Vec<Option<SegmentCardinalityCollectorBucket<S>>>,
|
|
|
|
|
accessor_idx: usize,
|
|
|
|
|
/// The column accessor to access the fast field values.
|
|
|
|
|
accessor: Column<u64>,
|
|
|
|
|
@@ -188,9 +449,13 @@ pub(crate) struct SegmentCardinalityCollector {
|
|
|
|
|
/// The missing value normalized to the internal u64 representation of the field type.
|
|
|
|
|
missing_value_for_accessor: Option<u64>,
|
|
|
|
|
coupon_cache: Option<CouponCache>,
|
|
|
|
|
/// Largest term_ord that may be inserted into a bucket. For str columns
|
|
|
|
|
/// this is `accessor.max_value()`; for non-str columns this is unused
|
|
|
|
|
/// (no inserts go into `entries`) and set to 0.
|
|
|
|
|
max_term_ord_inclusive: u64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Debug for SegmentCardinalityCollector {
|
|
|
|
|
impl<S: TermOrdAccumulator> Debug for SegmentCardinalityCollector<S> {
|
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
|
|
|
f.debug_struct("SegmentCardinalityCollector")
|
|
|
|
|
.field("column_type", &self.column_type)
|
|
|
|
|
@@ -202,16 +467,21 @@ impl Debug for SegmentCardinalityCollector {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) struct SegmentCardinalityCollectorBucket {
|
|
|
|
|
cardinality: CardinalityCollector,
|
|
|
|
|
entries: FxHashSet<u64>,
|
|
|
|
|
/// Per-bucket state. Shape depends on column kind: str columns dedup
|
|
|
|
|
/// term ords and only build the HLL sketch at finalization (saves the
|
|
|
|
|
/// ~96 B `CardinalityCollector` per bucket during collect); numeric/IpAddr
|
|
|
|
|
/// columns feed the sketch directly during collect.
|
|
|
|
|
pub(crate) enum SegmentCardinalityCollectorBucket<S: TermOrdAccumulator> {
|
|
|
|
|
Str(S),
|
|
|
|
|
Numeric(CardinalityCollector),
|
|
|
|
|
}
|
|
|
|
|
impl SegmentCardinalityCollectorBucket {
|
|
|
|
|
impl<S: TermOrdAccumulator> SegmentCardinalityCollectorBucket<S> {
|
|
|
|
|
#[inline(always)]
|
|
|
|
|
pub fn new(column_type: ColumnType) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
cardinality: CardinalityCollector::new(column_type as u8),
|
|
|
|
|
entries: FxHashSet::default(),
|
|
|
|
|
pub fn new(column_type: ColumnType, max_term_ord_inclusive: u64) -> Self {
|
|
|
|
|
if column_type == ColumnType::Str {
|
|
|
|
|
Self::Str(S::new(max_term_ord_inclusive))
|
|
|
|
|
} else {
|
|
|
|
|
Self::Numeric(CardinalityCollector::new(column_type as u8))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -222,37 +492,57 @@ impl SegmentCardinalityCollectorBucket {
|
|
|
|
|
//
|
|
|
|
|
// If the column is str, then the values are dictionary encoded
|
|
|
|
|
// and have not been added to the sketch yet.
|
|
|
|
|
// We need to resolves the term ords accumulated in self.entries
|
|
|
|
|
// with the coupon cache, and append the results to the sketch.
|
|
|
|
|
// We need to resolves the term ords accumulated in the str entries
|
|
|
|
|
// with the coupon cache, and append the results to a fresh sketch.
|
|
|
|
|
fn into_intermediate_metric_result(
|
|
|
|
|
mut self,
|
|
|
|
|
self,
|
|
|
|
|
coupon_cache_opt: Option<&CouponCache>,
|
|
|
|
|
) -> crate::Result<IntermediateMetricResult> {
|
|
|
|
|
if let Some(coupon_cache) = coupon_cache_opt {
|
|
|
|
|
assert!(self.cardinality.sketch.is_empty());
|
|
|
|
|
append_to_sketch(&self.entries, coupon_cache, &mut self.cardinality);
|
|
|
|
|
}
|
|
|
|
|
Ok(IntermediateMetricResult::Cardinality(self.cardinality))
|
|
|
|
|
let cardinality = match self {
|
|
|
|
|
Self::Str(entries) => {
|
|
|
|
|
let mut cardinality = CardinalityCollector::new(ColumnType::Str as u8);
|
|
|
|
|
if let Some(coupon_cache) = coupon_cache_opt {
|
|
|
|
|
// Sketch must be empty for str columns: coupons are appended here
|
|
|
|
|
// from the term_ord set (and not directly during collection).
|
|
|
|
|
assert!(cardinality.sketch.is_empty());
|
|
|
|
|
append_to_sketch(&entries, coupon_cache, &mut cardinality);
|
|
|
|
|
}
|
|
|
|
|
cardinality
|
|
|
|
|
}
|
|
|
|
|
Self::Numeric(cardinality) => cardinality,
|
|
|
|
|
};
|
|
|
|
|
Ok(IntermediateMetricResult::Cardinality(cardinality))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Builds a coupon cache from the given buckets, dictionary, and optional missing value.
|
|
|
|
|
/// Returns a mapping from term_ord to the hash (coupon) of the associated term.
|
|
|
|
|
fn build_coupon_cache(
|
|
|
|
|
buckets: &[Option<SegmentCardinalityCollectorBucket>],
|
|
|
|
|
fn build_coupon_cache<S: TermOrdAccumulator>(
|
|
|
|
|
buckets: &[Option<SegmentCardinalityCollectorBucket<S>>],
|
|
|
|
|
dictionary: &Dictionary,
|
|
|
|
|
missing_value_opt: Option<&Key>,
|
|
|
|
|
) -> io::Result<CouponCache> {
|
|
|
|
|
let term_ords_capacity: usize = buckets
|
|
|
|
|
.iter()
|
|
|
|
|
.flatten()
|
|
|
|
|
.map(|bucket| bucket.entries.len())
|
|
|
|
|
.max()
|
|
|
|
|
.unwrap_or(0)
|
|
|
|
|
* 2;
|
|
|
|
|
let mut term_ords_set = FxHashSet::with_capacity_and_hasher(term_ords_capacity, FxBuildHasher);
|
|
|
|
|
// Caller restricts this to str cardinality collectors, so every
|
|
|
|
|
// present bucket must be the `Str` variant. Pass 1 validates and
|
|
|
|
|
// computes the capacity hint; pass 2 inserts.
|
|
|
|
|
let mut max_bucket_len = 0usize;
|
|
|
|
|
for bucket in buckets.iter().flatten() {
|
|
|
|
|
term_ords_set.extend(bucket.entries.iter().copied());
|
|
|
|
|
match bucket {
|
|
|
|
|
SegmentCardinalityCollectorBucket::Str(entries) => {
|
|
|
|
|
max_bucket_len = max_bucket_len.max(entries.len());
|
|
|
|
|
}
|
|
|
|
|
SegmentCardinalityCollectorBucket::Numeric(_) => {
|
|
|
|
|
return Err(io::Error::other(
|
|
|
|
|
"build_coupon_cache invoked with a non-str bucket",
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
let mut term_ords_set = FxHashSet::with_capacity_and_hasher(max_bucket_len * 2, FxBuildHasher);
|
|
|
|
|
for bucket in buckets.iter().flatten() {
|
|
|
|
|
if let SegmentCardinalityCollectorBucket::Str(entries) = bucket {
|
|
|
|
|
term_ords_set.extend(entries.iter_ords());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
let mut term_ords: Vec<u64> = term_ords_set.into_iter().collect();
|
|
|
|
|
term_ords.sort_unstable();
|
|
|
|
|
@@ -284,8 +574,8 @@ fn build_coupon_cache(
|
|
|
|
|
Ok(CouponCache::new(term_ords, coupons, missing_coupon_opt))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn append_to_sketch(
|
|
|
|
|
term_ords: &FxHashSet<u64>,
|
|
|
|
|
fn append_to_sketch<S: TermOrdAccumulator>(
|
|
|
|
|
term_ords: &S,
|
|
|
|
|
coupon_cache: &CouponCache,
|
|
|
|
|
sketch: &mut CardinalityCollector,
|
|
|
|
|
) {
|
|
|
|
|
@@ -294,7 +584,7 @@ fn append_to_sketch(
|
|
|
|
|
coupon_map,
|
|
|
|
|
missing_coupon_opt,
|
|
|
|
|
} => {
|
|
|
|
|
for &term_ord in term_ords {
|
|
|
|
|
for term_ord in term_ords.iter_ords() {
|
|
|
|
|
if let Some(coupon) = coupon_map
|
|
|
|
|
.get(term_ord as usize)
|
|
|
|
|
.copied()
|
|
|
|
|
@@ -308,8 +598,8 @@ fn append_to_sketch(
|
|
|
|
|
coupon_map,
|
|
|
|
|
missing_coupon_opt,
|
|
|
|
|
} => {
|
|
|
|
|
for term_ord in term_ords {
|
|
|
|
|
if let Some(coupon) = coupon_map.get(term_ord).copied().or(*missing_coupon_opt) {
|
|
|
|
|
for term_ord in term_ords.iter_ords() {
|
|
|
|
|
if let Some(coupon) = coupon_map.get(&term_ord).copied().or(*missing_coupon_opt) {
|
|
|
|
|
sketch.insert_coupon(coupon);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -317,12 +607,13 @@ fn append_to_sketch(
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl SegmentCardinalityCollector {
|
|
|
|
|
impl<S: TermOrdAccumulator> SegmentCardinalityCollector<S> {
|
|
|
|
|
pub fn from_req(
|
|
|
|
|
column_type: ColumnType,
|
|
|
|
|
accessor_idx: usize,
|
|
|
|
|
accessor: Column<u64>,
|
|
|
|
|
missing_value_for_accessor: Option<u64>,
|
|
|
|
|
max_term_ord_inclusive: u64,
|
|
|
|
|
) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
buckets: Vec::new(),
|
|
|
|
|
@@ -331,6 +622,7 @@ impl SegmentCardinalityCollector {
|
|
|
|
|
accessor,
|
|
|
|
|
missing_value_for_accessor,
|
|
|
|
|
coupon_cache: None,
|
|
|
|
|
max_term_ord_inclusive,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -347,7 +639,9 @@ impl SegmentCardinalityCollector {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
|
|
|
|
impl<S: TermOrdAccumulator + 'static> SegmentAggregationCollector
|
|
|
|
|
for SegmentCardinalityCollector<S>
|
|
|
|
|
{
|
|
|
|
|
fn add_intermediate_aggregation_result(
|
|
|
|
|
&mut self,
|
|
|
|
|
agg_data: &AggregationsSegmentCtx,
|
|
|
|
|
@@ -402,31 +696,41 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
|
|
|
|
));
|
|
|
|
|
};
|
|
|
|
|
let col_block_accessor = &agg_data.column_block_accessor;
|
|
|
|
|
if self.column_type == ColumnType::Str {
|
|
|
|
|
for term_ord in col_block_accessor.iter_vals() {
|
|
|
|
|
bucket.entries.insert(term_ord);
|
|
|
|
|
match bucket {
|
|
|
|
|
SegmentCardinalityCollectorBucket::Str(entries) => {
|
|
|
|
|
// Promotion check runs on the pre-block state: the first call
|
|
|
|
|
// sees an empty set (no-op), and the last block of inserts
|
|
|
|
|
// doesn't trigger a promotion of a set we won't grow further.
|
|
|
|
|
// The trait dispatches once per block (via `extend_from_iter`)
|
|
|
|
|
// for adaptive variants and inlines to a tight loop for the
|
|
|
|
|
// BitSet path.
|
|
|
|
|
entries.maybe_compact();
|
|
|
|
|
entries.extend_from_iter(col_block_accessor.iter_vals());
|
|
|
|
|
}
|
|
|
|
|
} else if self.column_type == ColumnType::IpAddr {
|
|
|
|
|
let compact_space_accessor = self
|
|
|
|
|
.accessor
|
|
|
|
|
.values
|
|
|
|
|
.clone()
|
|
|
|
|
.downcast_arc::<CompactSpaceU64Accessor>()
|
|
|
|
|
.map_err(|_| {
|
|
|
|
|
TantivyError::AggregationError(
|
|
|
|
|
crate::aggregation::AggregationError::InternalError(
|
|
|
|
|
"Type mismatch: Could not downcast to CompactSpaceU64Accessor"
|
|
|
|
|
.to_string(),
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
})?;
|
|
|
|
|
for val in col_block_accessor.iter_vals() {
|
|
|
|
|
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
|
|
|
|
|
bucket.cardinality.insert(val);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
for val in col_block_accessor.iter_vals() {
|
|
|
|
|
bucket.cardinality.insert(val);
|
|
|
|
|
SegmentCardinalityCollectorBucket::Numeric(cardinality) => {
|
|
|
|
|
if self.column_type == ColumnType::IpAddr {
|
|
|
|
|
let compact_space_accessor = self
|
|
|
|
|
.accessor
|
|
|
|
|
.values
|
|
|
|
|
.clone()
|
|
|
|
|
.downcast_arc::<CompactSpaceU64Accessor>()
|
|
|
|
|
.map_err(|_| {
|
|
|
|
|
TantivyError::AggregationError(
|
|
|
|
|
crate::aggregation::AggregationError::InternalError(
|
|
|
|
|
"Type mismatch: Could not downcast to CompactSpaceU64Accessor"
|
|
|
|
|
.to_string(),
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
})?;
|
|
|
|
|
for val in col_block_accessor.iter_vals() {
|
|
|
|
|
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
|
|
|
|
|
cardinality.insert(val);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
for val in col_block_accessor.iter_vals() {
|
|
|
|
|
cardinality.insert(val);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -439,8 +743,13 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
|
|
|
|
_agg_data: &AggregationsSegmentCtx,
|
|
|
|
|
) -> crate::Result<()> {
|
|
|
|
|
if max_bucket as usize >= self.buckets.len() {
|
|
|
|
|
let column_type = self.column_type;
|
|
|
|
|
let max_term_ord_inclusive = self.max_term_ord_inclusive;
|
|
|
|
|
self.buckets.resize_with(max_bucket as usize + 1, || {
|
|
|
|
|
Some(SegmentCardinalityCollectorBucket::new(self.column_type))
|
|
|
|
|
Some(SegmentCardinalityCollectorBucket::<S>::new(
|
|
|
|
|
column_type,
|
|
|
|
|
max_term_ord_inclusive,
|
|
|
|
|
))
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
@@ -458,13 +767,14 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
|
|
|
|
return None;
|
|
|
|
|
}
|
|
|
|
|
let bucket = self.buckets.get(bucket_id as usize)?.as_ref()?;
|
|
|
|
|
// For string columns the HLL sketch is empty until materialization; entries holds
|
|
|
|
|
// the deduplicated term ordinals seen, which is the exact distinct count.
|
|
|
|
|
// For numeric columns the sketch is populated during collect.
|
|
|
|
|
if self.column_type == ColumnType::Str {
|
|
|
|
|
Some(bucket.entries.len() as f64)
|
|
|
|
|
} else {
|
|
|
|
|
Some(bucket.cardinality.sketch.estimate().trunc())
|
|
|
|
|
// For string columns the sketch isn't built until finalization; the
|
|
|
|
|
// term_ord set's len is the exact distinct count. For numeric columns
|
|
|
|
|
// the sketch is populated during collect.
|
|
|
|
|
match bucket {
|
|
|
|
|
SegmentCardinalityCollectorBucket::Str(entries) => Some(entries.len() as f64),
|
|
|
|
|
SegmentCardinalityCollectorBucket::Numeric(cardinality) => {
|
|
|
|
|
Some(cardinality.sketch.estimate().trunc())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -614,6 +924,134 @@ mod tests {
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Build a single-segment string-cardinality index with 32 unique terms.
|
|
|
|
|
/// `column.max_value() = 31` is well below `BITSET_MAX_TERM_ORD`,
|
|
|
|
|
/// so the bucket exercises the `BitSet` path end to end.
|
|
|
|
|
#[test]
|
|
|
|
|
fn cardinality_aggregation_test_str_bitset() -> crate::Result<()> {
|
|
|
|
|
let terms: Vec<String> = (0..32).map(|i| format!("term_{i}")).collect();
|
|
|
|
|
let term_refs: Vec<Vec<&str>> = terms.iter().map(|t| vec![t.as_str()]).collect::<Vec<_>>();
|
|
|
|
|
// single segment so we have a single dictionary of 32 terms.
|
|
|
|
|
let index = get_test_index_from_terms(true, &term_refs)?;
|
|
|
|
|
|
|
|
|
|
let agg_req: Aggregations = serde_json::from_value(json!({
|
|
|
|
|
"cardinality": {
|
|
|
|
|
"cardinality": { "field": "string_id" }
|
|
|
|
|
},
|
|
|
|
|
}))
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let res = exec_request(agg_req, &index)?;
|
|
|
|
|
assert_eq!(res["cardinality"]["value"], 32.0);
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// `BitSet` path with a `missing` parameter: the column-level missing
|
|
|
|
|
/// sentinel (`column.max_value() + 1`) flows into the bitset, the
|
|
|
|
|
/// dict lookup filter at finalization drops it, and the missing
|
|
|
|
|
/// coupon is applied separately.
|
|
|
|
|
#[test]
|
|
|
|
|
fn cardinality_aggregation_test_str_bitset_with_missing() {
|
|
|
|
|
let mut schema_builder = Schema::builder();
|
|
|
|
|
let name_field = schema_builder.add_text_field("name", STRING | FAST);
|
|
|
|
|
let index = Index::create_in_ram(schema_builder.build());
|
|
|
|
|
let mut writer = index.writer_for_tests().unwrap();
|
|
|
|
|
for i in 0..16 {
|
|
|
|
|
let term = format!("t{i:02}");
|
|
|
|
|
writer.add_document(doc!(name_field => term)).unwrap();
|
|
|
|
|
}
|
|
|
|
|
// One empty doc, exercising the missing sentinel.
|
|
|
|
|
writer.add_document(doc!()).unwrap();
|
|
|
|
|
writer.commit().unwrap();
|
|
|
|
|
|
|
|
|
|
let agg_req: Aggregations = serde_json::from_value(json!({
|
|
|
|
|
"cardinality": {
|
|
|
|
|
"cardinality": {
|
|
|
|
|
"field": "name",
|
|
|
|
|
"missing": "MISSING_SENTINEL_KEY",
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
}))
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let res = exec_request(agg_req, &index).unwrap();
|
|
|
|
|
// 16 distinct real terms + 1 distinct "missing" value = 17.
|
|
|
|
|
assert_eq!(res["cardinality"]["value"], 17.0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Unit-test the PagedBitset itself: cross-page inserts produce sorted
|
|
|
|
|
/// iteration, len() matches the inserted set, and duplicates are
|
|
|
|
|
/// idempotent.
|
|
|
|
|
#[test]
|
|
|
|
|
fn paged_bitset_basic() {
|
|
|
|
|
use super::PagedBitset;
|
|
|
|
|
// Span several pages: BITSET_PAGE_BITS = 1024, so ords > 1024 land
|
|
|
|
|
// on the second page, > 2048 on the third, etc.
|
|
|
|
|
let ords = [0u64, 1, 63, 64, 1023, 1024, 1025, 4096, 4097, 9999, 10_000];
|
|
|
|
|
let max_ord = *ords.iter().max().unwrap();
|
|
|
|
|
let mut bitset = PagedBitset::with_max_term_ord(max_ord);
|
|
|
|
|
for &ord in &ords {
|
|
|
|
|
bitset.insert(ord);
|
|
|
|
|
// Idempotent: inserting again must not increase count.
|
|
|
|
|
bitset.insert(ord);
|
|
|
|
|
}
|
|
|
|
|
assert_eq!(bitset.len(), ords.len() as u64);
|
|
|
|
|
let collected: Vec<u64> = bitset.iter_sorted().collect();
|
|
|
|
|
let mut expected: Vec<u64> = ords.to_vec();
|
|
|
|
|
expected.sort_unstable();
|
|
|
|
|
assert_eq!(collected, expected);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Unit-test `TermOrdSet`: starts Sparse, promotes to Dense on
|
|
|
|
|
/// `maybe_compact` once the density threshold is crossed, and
|
|
|
|
|
/// `iter_ords()` yields the same set in either state. Ords spanning
|
|
|
|
|
/// multiple paged-bitset pages exercise the Dense iter ordering.
|
|
|
|
|
#[test]
|
|
|
|
|
fn term_ord_set_promotes_on_maybe_compact() {
|
|
|
|
|
use super::{TermOrdAccumulator, TermOrdSet, PROMOTION_RATIO};
|
|
|
|
|
// Pick max so promotion needs few inserts: len * RATIO > max with
|
|
|
|
|
// RATIO=32 and max=64 trips at len=3 (3*32=96 > 64).
|
|
|
|
|
let max_term_ord = 64u64;
|
|
|
|
|
let mut set = <TermOrdSet as TermOrdAccumulator>::new(max_term_ord);
|
|
|
|
|
// Two inserts: should stay Sparse after maybe_compact (2 * RATIO = 64, not > 64).
|
|
|
|
|
set.insert(0);
|
|
|
|
|
set.insert(7);
|
|
|
|
|
set.maybe_compact();
|
|
|
|
|
assert_eq!(set.len(), 2);
|
|
|
|
|
|
|
|
|
|
// Third insert promotes on next maybe_compact.
|
|
|
|
|
set.insert(20);
|
|
|
|
|
assert_eq!(set.len(), 3);
|
|
|
|
|
// Sanity check: at len=3, 3 * PROMOTION_RATIO = 96 > 64.
|
|
|
|
|
assert!(3u64 * PROMOTION_RATIO > max_term_ord);
|
|
|
|
|
set.maybe_compact();
|
|
|
|
|
|
|
|
|
|
// Post-promotion: extending continues to work.
|
|
|
|
|
set.insert(15);
|
|
|
|
|
set.insert(15); // dup
|
|
|
|
|
assert_eq!(set.len(), 4);
|
|
|
|
|
|
|
|
|
|
let mut collected: Vec<u64> = set.iter_ords().collect();
|
|
|
|
|
collected.sort_unstable();
|
|
|
|
|
assert_eq!(collected, vec![0, 7, 15, 20]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Unit-test the `BitSet` impl of `TermOrdAccumulator`: insert,
|
|
|
|
|
/// dedup, and iter_ords order.
|
|
|
|
|
#[test]
|
|
|
|
|
fn bitset_accumulator_basic() {
|
|
|
|
|
use common::BitSet;
|
|
|
|
|
|
|
|
|
|
use super::TermOrdAccumulator;
|
|
|
|
|
let mut set = <BitSet as TermOrdAccumulator>::new(255);
|
|
|
|
|
for ord in [0u64, 1, 63, 64, 65, 128, 200, 200, 0] {
|
|
|
|
|
<BitSet as TermOrdAccumulator>::insert(&mut set, ord);
|
|
|
|
|
}
|
|
|
|
|
assert_eq!(<BitSet as TermOrdAccumulator>::len(&set), 7);
|
|
|
|
|
let collected: Vec<u64> = set.iter_ords().collect();
|
|
|
|
|
assert_eq!(collected, vec![0, 1, 63, 64, 65, 128, 200]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cardinality_aggregation_u64() -> crate::Result<()> {
|
|
|
|
|
let mut schema_builder = Schema::builder();
|
|
|
|
|
@@ -705,6 +1143,42 @@ mod tests {
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A JSON path that resolves to both a Str column and a numeric column
|
|
|
|
|
/// produces two collector instances per segment — one with `Str` buckets
|
|
|
|
|
/// and one with `Numeric` buckets. Their `IntermediateMetricResult`s must
|
|
|
|
|
/// merge into the union cardinality.
|
|
|
|
|
#[test]
|
|
|
|
|
fn cardinality_aggregation_json_str_and_numeric() -> crate::Result<()> {
|
|
|
|
|
let mut schema_builder = Schema::builder();
|
|
|
|
|
let field = schema_builder.add_json_field("json", FAST);
|
|
|
|
|
let index = Index::create_in_ram(schema_builder.build());
|
|
|
|
|
{
|
|
|
|
|
let mut writer = index.writer_for_tests()?;
|
|
|
|
|
writer.add_document(doc!(field => json!({"value": "hello"})))?;
|
|
|
|
|
writer.add_document(doc!(field => json!({"value": "world"})))?;
|
|
|
|
|
writer.add_document(doc!(field => json!({"value": "hello"})))?; // dup str
|
|
|
|
|
writer.add_document(doc!(field => json!({"value": i64::from_u64(7u64)})))?;
|
|
|
|
|
writer.add_document(doc!(field => json!({"value": i64::from_u64(42u64)})))?;
|
|
|
|
|
writer.add_document(doc!(field => json!({"value": i64::from_u64(7u64)})))?; // dup num
|
|
|
|
|
writer.commit()?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let agg_req: Aggregations = serde_json::from_value(json!({
|
|
|
|
|
"cardinality": {
|
|
|
|
|
"cardinality": {
|
|
|
|
|
"field": "json.value"
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}))
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let res = exec_request(agg_req, &index)?;
|
|
|
|
|
// 4 distinct values: "hello", "world", 7, 42.
|
|
|
|
|
assert_eq!(res["cardinality"]["value"], 4.0);
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cardinality_collector_serde_roundtrip() {
|
|
|
|
|
use super::CardinalityCollector;
|
|
|
|
|
|