mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-02-12 02:50:37 +00:00
Compare commits
1 Commits
congxie/re
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
57fe659fff |
@@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
|
||||
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
|
||||
datasketches = "0.2.0"
|
||||
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
|
||||
futures-util = { version = "0.3.28", optional = true }
|
||||
futures-channel = { version = "0.3.28", optional = true }
|
||||
fnv = "1.0.7"
|
||||
|
||||
@@ -62,7 +62,9 @@ impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
|
||||
pub struct AntiCallToken(());
|
||||
|
||||
/// Trait used to indicate when no more write need to be done on a writer
|
||||
pub trait TerminatingWrite: Write + Send + Sync {
|
||||
///
|
||||
/// Thread-safety is enforced at the call sites that require it.
|
||||
pub trait TerminatingWrite: Write {
|
||||
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
|
||||
fn terminate(mut self) -> io::Result<()>
|
||||
where Self: Sized {
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use std::hash::Hash;
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{BuildHasher, Hasher};
|
||||
|
||||
use columnar::column_values::CompactSpaceU64Accessor;
|
||||
use columnar::{Column, ColumnType, Dictionary, StrColumn};
|
||||
use common::f64_to_u64;
|
||||
use datasketches::hll::{HllSketch, HllType, HllUnion};
|
||||
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
|
||||
use rustc_hash::FxHashSet;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::aggregation::agg_data::AggregationsSegmentCtx;
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
@@ -15,17 +16,29 @@ use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
|
||||
use crate::aggregation::*;
|
||||
use crate::TantivyError;
|
||||
|
||||
/// Log2 of the number of registers. Must match the Java `Union(LOG2M)` where LOG2M=11.
|
||||
/// 2^11 = 2048 registers.
|
||||
const LG_K: u8 = 11;
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
struct BuildSaltedHasher {
|
||||
salt: u8,
|
||||
}
|
||||
|
||||
impl BuildHasher for BuildSaltedHasher {
|
||||
type Hasher = DefaultHasher;
|
||||
|
||||
fn build_hasher(&self) -> Self::Hasher {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
hasher.write_u8(self.salt);
|
||||
|
||||
hasher
|
||||
}
|
||||
}
|
||||
|
||||
/// # Cardinality
|
||||
///
|
||||
/// The cardinality aggregation allows for computing an estimate
|
||||
/// of the number of different values in a data set based on the
|
||||
/// Apache DataSketches HyperLogLog algorithm. This is particularly useful for
|
||||
/// understanding the uniqueness of values in a large dataset where counting
|
||||
/// each unique value individually would be computationally expensive.
|
||||
/// HyperLogLog++ algorithm. This is particularly useful for understanding the
|
||||
/// uniqueness of values in a large dataset where counting each unique value
|
||||
/// individually would be computationally expensive.
|
||||
///
|
||||
/// For example, you might use a cardinality aggregation to estimate the number
|
||||
/// of unique visitors to a website by aggregating on a field that contains
|
||||
@@ -171,7 +184,7 @@ impl SegmentCardinalityCollectorBucket {
|
||||
|
||||
term_ids.sort_unstable();
|
||||
dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| {
|
||||
self.cardinality.insert(term);
|
||||
self.cardinality.sketch.insert_any(&term);
|
||||
Ok(())
|
||||
})?;
|
||||
if has_missing {
|
||||
@@ -182,17 +195,17 @@ impl SegmentCardinalityCollectorBucket {
|
||||
);
|
||||
match missing_key {
|
||||
Key::Str(missing) => {
|
||||
self.cardinality.insert(missing.as_str());
|
||||
self.cardinality.sketch.insert_any(&missing);
|
||||
}
|
||||
Key::F64(val) => {
|
||||
let val = f64_to_u64(*val);
|
||||
self.cardinality.insert(val);
|
||||
self.cardinality.sketch.insert_any(&val);
|
||||
}
|
||||
Key::U64(val) => {
|
||||
self.cardinality.insert(*val);
|
||||
self.cardinality.sketch.insert_any(&val);
|
||||
}
|
||||
Key::I64(val) => {
|
||||
self.cardinality.insert(*val);
|
||||
self.cardinality.sketch.insert_any(&val);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -283,11 +296,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
})?;
|
||||
for val in col_block_accessor.iter_vals() {
|
||||
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
|
||||
bucket.cardinality.insert(val);
|
||||
bucket.cardinality.sketch.insert_any(&val);
|
||||
}
|
||||
} else {
|
||||
for val in col_block_accessor.iter_vals() {
|
||||
bucket.cardinality.insert(val);
|
||||
bucket.cardinality.sketch.insert_any(&val);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -308,17 +321,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
/// The cardinality collector used during segment collection and for merging results.
|
||||
/// Uses Apache DataSketches HLL (lg_k=11) for compatibility with Datadog's event query.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
/// The percentiles collector used during segment collection and for merging results.
|
||||
pub struct CardinalityCollector {
|
||||
sketch: HllSketch,
|
||||
/// Salt derived from `ColumnType`, used to differentiate values of different column types
|
||||
/// that map to the same u64 (e.g. bool `false` = 0 vs i64 `0`).
|
||||
/// Not serialized — only needed during insertion, not after sketch registers are populated.
|
||||
salt: u8,
|
||||
sketch: HyperLogLogPlus<u64, BuildSaltedHasher>,
|
||||
}
|
||||
|
||||
impl Default for CardinalityCollector {
|
||||
fn default() -> Self {
|
||||
Self::new(0)
|
||||
@@ -331,52 +338,25 @@ impl PartialEq for CardinalityCollector {
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for CardinalityCollector {
|
||||
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
let bytes = self.sketch.serialize();
|
||||
serializer.serialize_bytes(&bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for CardinalityCollector {
|
||||
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
|
||||
let bytes: Vec<u8> = Deserialize::deserialize(deserializer)?;
|
||||
let sketch = HllSketch::deserialize(&bytes).map_err(serde::de::Error::custom)?;
|
||||
Ok(Self { sketch, salt: 0 })
|
||||
}
|
||||
}
|
||||
|
||||
impl CardinalityCollector {
|
||||
/// Compute the final cardinality estimate.
|
||||
pub fn finalize(self) -> Option<f64> {
|
||||
Some(self.sketch.clone().count().trunc())
|
||||
}
|
||||
|
||||
fn new(salt: u8) -> Self {
|
||||
Self {
|
||||
sketch: HllSketch::new(LG_K, HllType::Hll4),
|
||||
salt,
|
||||
sketch: HyperLogLogPlus::new(16, BuildSaltedHasher { salt }).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a value into the HLL sketch, salted by the column type.
|
||||
/// The salt ensures that identical u64 values from different column types
|
||||
/// (e.g. bool `false` vs i64 `0`) are counted as distinct.
|
||||
pub(crate) fn insert<T: Hash>(&mut self, value: T) {
|
||||
self.sketch.update((self.salt, value));
|
||||
}
|
||||
|
||||
/// Compute the final cardinality estimate.
|
||||
pub fn finalize(self) -> Option<f64> {
|
||||
Some(self.sketch.estimate().trunc())
|
||||
}
|
||||
|
||||
/// Serialize the HLL sketch to its compact binary representation.
|
||||
/// This format is compatible with Apache DataSketches Java (`HllSketch.heapify()`).
|
||||
pub fn to_sketch_bytes(&self) -> Vec<u8> {
|
||||
self.sketch.serialize()
|
||||
}
|
||||
|
||||
pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> {
|
||||
let mut union = HllUnion::new(LG_K);
|
||||
union.update(&self.sketch);
|
||||
union.update(&right.sketch);
|
||||
self.sketch = union.get_result(HllType::Hll4);
|
||||
self.sketch.merge(&right.sketch).map_err(|err| {
|
||||
TantivyError::AggregationError(AggregationError::InternalError(format!(
|
||||
"Error while merging cardinality {err:?}"
|
||||
)))
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -538,75 +518,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_serde_roundtrip() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut collector = CardinalityCollector::default();
|
||||
collector.insert("hello");
|
||||
collector.insert("world");
|
||||
collector.insert("hello"); // duplicate
|
||||
|
||||
let serialized = serde_json::to_vec(&collector).unwrap();
|
||||
let deserialized: CardinalityCollector = serde_json::from_slice(&serialized).unwrap();
|
||||
|
||||
let original_estimate = collector.finalize().unwrap();
|
||||
let roundtrip_estimate = deserialized.finalize().unwrap();
|
||||
assert_eq!(original_estimate, roundtrip_estimate);
|
||||
assert_eq!(original_estimate, 2.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_merge() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut left = CardinalityCollector::default();
|
||||
left.insert("a");
|
||||
left.insert("b");
|
||||
|
||||
let mut right = CardinalityCollector::default();
|
||||
right.insert("b");
|
||||
right.insert("c");
|
||||
|
||||
left.merge_fruits(right).unwrap();
|
||||
let estimate = left.finalize().unwrap();
|
||||
assert_eq!(estimate, 3.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_serialize_deserialize_binary() {
|
||||
use datasketches::hll::HllSketch;
|
||||
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut collector = CardinalityCollector::default();
|
||||
collector.insert("apple");
|
||||
collector.insert("banana");
|
||||
collector.insert("cherry");
|
||||
|
||||
let bytes = collector.to_sketch_bytes();
|
||||
let deserialized = HllSketch::deserialize(&bytes).unwrap();
|
||||
assert!((deserialized.estimate() - 3.0).abs() < 0.01);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_salt_differentiates_types() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
// Without salt, same u64 value from different column types would collide
|
||||
let mut collector_bool = CardinalityCollector::new(5); // e.g. ColumnType::Bool
|
||||
collector_bool.insert(0u64); // false
|
||||
collector_bool.insert(1u64); // true
|
||||
|
||||
let mut collector_i64 = CardinalityCollector::new(2); // e.g. ColumnType::I64
|
||||
collector_i64.insert(0u64);
|
||||
collector_i64.insert(1u64);
|
||||
|
||||
// Merge them
|
||||
collector_bool.merge_fruits(collector_i64).unwrap();
|
||||
let estimate = collector_bool.finalize().unwrap();
|
||||
// Should be 4 because salt makes (5, 0) != (2, 0) and (5, 1) != (2, 1)
|
||||
assert_eq!(estimate, 4.0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::path::PathBuf;
|
||||
pub use common::file_slice::{FileHandle, FileSlice};
|
||||
pub use common::{AntiCallToken, OwnedBytes, TerminatingWrite};
|
||||
|
||||
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
|
||||
pub use self::composite_file::{CompositeFile, CompositeWrite};
|
||||
pub use self::directory::{Directory, DirectoryClone, DirectoryLock};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub use self::ram_directory::RamDirectory;
|
||||
@@ -52,7 +52,7 @@ pub use self::mmap_directory::MmapDirectory;
|
||||
///
|
||||
/// `WritePtr` are required to implement both Write
|
||||
/// and Seek.
|
||||
pub type WritePtr = BufWriter<Box<dyn TerminatingWrite>>;
|
||||
pub type WritePtr = BufWriter<Box<dyn TerminatingWrite + Send + Sync>>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -14,7 +14,8 @@ mod postings;
|
||||
mod postings_writer;
|
||||
mod recorder;
|
||||
mod segment_postings;
|
||||
mod serializer;
|
||||
/// Serializer module for the inverted index
|
||||
pub mod serializer;
|
||||
mod skip;
|
||||
mod term_info;
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::positions::PositionSerializer;
|
||||
use crate::postings::compression::{BlockEncoder, VIntEncoder, COMPRESSION_BLOCK_SIZE};
|
||||
use crate::postings::skip::SkipSerializer;
|
||||
use crate::query::Bm25Weight;
|
||||
use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema};
|
||||
use crate::schema::{Field, FieldEntry, IndexRecordOption, Schema};
|
||||
use crate::termdict::TermDictionaryBuilder;
|
||||
use crate::{DocId, Score};
|
||||
|
||||
@@ -80,9 +80,12 @@ impl InvertedIndexSerializer {
|
||||
let term_dictionary_write = self.terms_write.for_field(field);
|
||||
let postings_write = self.postings_write.for_field(field);
|
||||
let positions_write = self.positions_write.for_field(field);
|
||||
let field_type: FieldType = (*field_entry.field_type()).clone();
|
||||
let index_record_option = field_entry
|
||||
.field_type()
|
||||
.index_record_option()
|
||||
.unwrap_or(IndexRecordOption::Basic);
|
||||
FieldSerializer::create(
|
||||
&field_type,
|
||||
index_record_option,
|
||||
total_num_tokens,
|
||||
term_dictionary_write,
|
||||
postings_write,
|
||||
@@ -102,29 +105,27 @@ impl InvertedIndexSerializer {
|
||||
|
||||
/// The field serializer is in charge of
|
||||
/// the serialization of a specific field.
|
||||
pub struct FieldSerializer<'a> {
|
||||
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<WritePtr>>,
|
||||
pub struct FieldSerializer<'a, W: Write = WritePtr> {
|
||||
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<W>>,
|
||||
postings_serializer: PostingsSerializer,
|
||||
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<WritePtr>>>,
|
||||
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<W>>>,
|
||||
current_term_info: TermInfo,
|
||||
term_open: bool,
|
||||
postings_write: &'a mut CountingWriter<WritePtr>,
|
||||
postings_write: &'a mut CountingWriter<W>,
|
||||
postings_start_offset: u64,
|
||||
}
|
||||
|
||||
impl<'a> FieldSerializer<'a> {
|
||||
fn create(
|
||||
field_type: &FieldType,
|
||||
impl<'a, W: Write> FieldSerializer<'a, W> {
|
||||
/// Creates a new `FieldSerializer` for the given field type.
|
||||
pub fn create(
|
||||
index_record_option: IndexRecordOption,
|
||||
total_num_tokens: u64,
|
||||
term_dictionary_write: &'a mut CountingWriter<WritePtr>,
|
||||
postings_write: &'a mut CountingWriter<WritePtr>,
|
||||
positions_write: &'a mut CountingWriter<WritePtr>,
|
||||
term_dictionary_write: &'a mut CountingWriter<W>,
|
||||
postings_write: &'a mut CountingWriter<W>,
|
||||
positions_write: &'a mut CountingWriter<W>,
|
||||
fieldnorm_reader: Option<FieldNormReader>,
|
||||
) -> io::Result<FieldSerializer<'a>> {
|
||||
) -> io::Result<FieldSerializer<'a, W>> {
|
||||
total_num_tokens.serialize(postings_write)?;
|
||||
let index_record_option = field_type
|
||||
.index_record_option()
|
||||
.unwrap_or(IndexRecordOption::Basic);
|
||||
let term_dictionary_builder = TermDictionaryBuilder::create(term_dictionary_write)?;
|
||||
let average_fieldnorm = fieldnorm_reader
|
||||
.as_ref()
|
||||
@@ -192,6 +193,11 @@ impl<'a> FieldSerializer<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Starts the postings for a new term without recording term frequencies.
|
||||
pub fn new_term_without_freq(&mut self, term: &[u8]) -> io::Result<()> {
|
||||
self.new_term(term, 0, false)
|
||||
}
|
||||
|
||||
/// Serialize the information that a document contains for the current term:
|
||||
/// its term frequency, and the position deltas.
|
||||
///
|
||||
@@ -297,6 +303,7 @@ impl Block {
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializer for postings lists.
|
||||
pub struct PostingsSerializer {
|
||||
last_doc_id_encoded: u32,
|
||||
|
||||
@@ -316,6 +323,9 @@ pub struct PostingsSerializer {
|
||||
}
|
||||
|
||||
impl PostingsSerializer {
|
||||
/// Creates a new `PostingsSerializer`.
|
||||
/// * avg_fieldnorm - average field norm for the field being serialized.
|
||||
/// * mode - indexing options for the field being serialized.
|
||||
pub fn new(
|
||||
avg_fieldnorm: Score,
|
||||
mode: IndexRecordOption,
|
||||
@@ -338,6 +348,8 @@ impl PostingsSerializer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts the serialization for a new term.
|
||||
/// * term_doc_freq - the number of documents containing the term.
|
||||
pub fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) {
|
||||
self.bm25_weight = None;
|
||||
|
||||
@@ -377,6 +389,7 @@ impl PostingsSerializer {
|
||||
self.postings_write.extend(block_encoded);
|
||||
}
|
||||
if self.term_has_freq {
|
||||
// encode the term frequencies
|
||||
let (num_bits, block_encoded): (u8, &[u8]) = self
|
||||
.block_encoder
|
||||
.compress_block_unsorted(self.block.term_freqs(), true);
|
||||
@@ -417,6 +430,9 @@ impl PostingsSerializer {
|
||||
self.block.clear();
|
||||
}
|
||||
|
||||
/// Register that the given document contains the current term.
|
||||
/// * doc_id - the document id.
|
||||
/// * term_freq - the term frequency within the document.
|
||||
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) {
|
||||
self.block.append_doc(doc_id, term_freq);
|
||||
if self.block.is_full() {
|
||||
@@ -424,6 +440,7 @@ impl PostingsSerializer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Finish the serialization for this term.
|
||||
pub fn close_term(
|
||||
&mut self,
|
||||
doc_freq: u32,
|
||||
|
||||
@@ -14,7 +14,11 @@ use crate::{DocId, Score, TERMINATED};
|
||||
// (requiring a 6th bit), but the biggest doc_id we can want to encode is TERMINATED-1, which can
|
||||
// be represented on 31b without delta encoding.
|
||||
fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 {
|
||||
assert!(bitwidth < 32);
|
||||
assert!(
|
||||
bitwidth < 32,
|
||||
"bitwidth needs to be less than 32, but got {}",
|
||||
bitwidth
|
||||
);
|
||||
bitwidth | ((delta_1 as u8) << 6)
|
||||
}
|
||||
|
||||
|
||||
@@ -302,8 +302,9 @@ where
|
||||
|| self.previous_key[keep_len] < key[keep_len];
|
||||
assert!(
|
||||
increasing_keys,
|
||||
"Keys should be increasing. ({:?} > {key:?})",
|
||||
self.previous_key
|
||||
"Keys should be increasing. ({:?} > {:?})",
|
||||
String::from_utf8_lossy(&self.previous_key),
|
||||
String::from_utf8_lossy(key),
|
||||
);
|
||||
self.previous_key.resize(key.len(), 0u8);
|
||||
self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
|
||||
|
||||
Reference in New Issue
Block a user