mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-01 16:10:42 +00:00
Compare commits
19 Commits
dependabot
...
larger-col
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
468850e9f4 | ||
|
|
a27c64998f | ||
|
|
46b3fb9ed3 | ||
|
|
fbe620b9b4 | ||
|
|
95d8a3989a | ||
|
|
ea61a68db4 | ||
|
|
c367df37c1 | ||
|
|
d99a5d4e91 | ||
|
|
2de6f075ce | ||
|
|
18080067c7 | ||
|
|
95db7d2e5c | ||
|
|
fc017c4c74 | ||
|
|
141c91d028 | ||
|
|
36a83e7c1a | ||
|
|
be11f8a6a1 | ||
|
|
4305e4029e | ||
|
|
edfb02b47e | ||
|
|
d0fad88bac | ||
|
|
351280c0b4 |
@@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2021"
|
||||
rust-version = "1.86"
|
||||
rust-version = "1.92"
|
||||
exclude = ["benches/*.json", "benches/*.txt"]
|
||||
|
||||
[dependencies]
|
||||
@@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
|
||||
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
|
||||
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "7635fb8" }
|
||||
datasketches = { version = "0.3.0", features = ["hll"] }
|
||||
futures-util = { version = "0.3.28", optional = true }
|
||||
futures-channel = { version = "0.3.28", optional = true }
|
||||
fnv = "1.0.7"
|
||||
@@ -75,7 +75,7 @@ typetag = "0.2.21"
|
||||
winapi = "0.3.9"
|
||||
|
||||
[dev-dependencies]
|
||||
binggan = "0.16.1"
|
||||
binggan = "0.17.0"
|
||||
rand = "0.9"
|
||||
maplit = "1.0.2"
|
||||
matches = "0.1.9"
|
||||
|
||||
@@ -79,13 +79,12 @@ fn bench_agg(mut group: InputGroup<Index>) {
|
||||
register!(group, composite_histogram_calendar);
|
||||
|
||||
register!(group, cardinality_agg);
|
||||
register!(group, cardinality_agg_high_card);
|
||||
register!(group, cardinality_agg_low_card);
|
||||
register!(group, terms_status_with_cardinality_agg);
|
||||
register!(group, terms_100_buckets_with_cardinality_agg);
|
||||
register!(group, terms_many_with_single_term_order_by_cardinality_agg);
|
||||
register!(
|
||||
group,
|
||||
terms_many_with_nested_terms_double_order_by_cardinality_agg
|
||||
);
|
||||
register!(group, terms_many_with_single_term_order_by_card);
|
||||
register!(group, terms_many_with_single_term_2_order_by_card);
|
||||
|
||||
register!(group, range_agg);
|
||||
register!(group, range_agg_with_avg_sub_agg);
|
||||
@@ -173,6 +172,32 @@ fn cardinality_agg(index: &Index) {
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
// Full-scan cardinality on a near-1M-cardinality string field.
|
||||
// Hits the dense (PagedBitset) path: every doc has a unique term,
|
||||
// so the bucket promotes from FxHashSet shortly into the scan.
|
||||
fn cardinality_agg_high_card(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "text_all_unique_terms"
|
||||
},
|
||||
}
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
// Full-scan cardinality on a tiny-cardinality string field (7 distinct
|
||||
// values). Stays on the FxHashSet path — the promotion threshold is
|
||||
// never crossed. Validates no regression on the sparse path.
|
||||
fn cardinality_agg_low_card(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "text_few_terms_status"
|
||||
},
|
||||
}
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
fn terms_status_with_cardinality_agg(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"my_texts": {
|
||||
@@ -205,7 +230,7 @@ fn terms_100_buckets_with_cardinality_agg(index: &Index) {
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
fn terms_many_with_single_term_order_by_cardinality_agg(index: &Index) {
|
||||
fn terms_many_with_single_term_order_by_card(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"my_texts": {
|
||||
"terms": { "field": "text_many_terms" },
|
||||
@@ -217,7 +242,7 @@ fn terms_many_with_single_term_order_by_cardinality_agg(index: &Index) {
|
||||
},
|
||||
"aggs": {
|
||||
"cardinality": {
|
||||
"cardinality": { "field": "text_many_terms" }
|
||||
"cardinality": { "field": "text_few_terms" }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -230,22 +255,20 @@ fn terms_many_with_single_term_order_by_cardinality_agg(index: &Index) {
|
||||
// Two-level terms ordered by cardinality at each level: a high-card outer terms
|
||||
// (text_many_terms) ordered by a cardinality sub-agg, with a nested low-card terms
|
||||
// (text_few_terms_status) also ordered by a cardinality sub-agg, plus an avg.
|
||||
fn terms_many_with_nested_terms_double_order_by_cardinality_agg(index: &Index) {
|
||||
fn terms_many_with_single_term_2_order_by_card(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"by_ip": {
|
||||
"terms": {
|
||||
"field": "text_many_terms",
|
||||
"size": 50,
|
||||
"order": { "distinct_path": "desc" }
|
||||
"order": { "card_few_terms": "desc" }
|
||||
},
|
||||
"aggs": {
|
||||
"distinct_path": {
|
||||
"card_few_terms": {
|
||||
"cardinality": { "field": "text_few_terms" }
|
||||
},
|
||||
"by_asn": {
|
||||
"nested_terms": {
|
||||
"terms": {
|
||||
"field": " single_term",
|
||||
"size": 10,
|
||||
"order": { "distinct_path2": "desc" }
|
||||
},
|
||||
"aggs": {
|
||||
|
||||
@@ -23,7 +23,7 @@ downcast-rs = "2.0.1"
|
||||
proptest = "1"
|
||||
more-asserts = "0.3.1"
|
||||
rand = "0.9"
|
||||
binggan = "0.16.1"
|
||||
binggan = "0.17.0"
|
||||
|
||||
[[bench]]
|
||||
name = "bench_merge"
|
||||
|
||||
@@ -19,6 +19,6 @@ time = { version = "0.3.47", features = ["serde-well-known"] }
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
|
||||
[dev-dependencies]
|
||||
binggan = "0.16.1"
|
||||
binggan = "0.17.0"
|
||||
proptest = "1.0.0"
|
||||
rand = "0.9"
|
||||
|
||||
@@ -121,7 +121,7 @@ pub struct FileSlice {
|
||||
|
||||
impl fmt::Debug for FileSlice {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "FileSlice({:?}, {:?})", &self.data, self.range)
|
||||
write!(f, "FileSlice({:?}, {:?})", self.data, self.range)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,8 +20,8 @@ use crate::aggregation::metric::{
|
||||
build_segment_stats_collector, AverageAggregation, CardinalityAggReqData,
|
||||
CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation, MaxAggregation,
|
||||
MetricAggReqData, MinAggregation, SegmentCardinalityCollector, SegmentExtendedStatsCollector,
|
||||
SegmentPercentilesCollector, StatsAggregation, StatsType, SumAggregation, TopHitsAggReqData,
|
||||
TopHitsSegmentCollector,
|
||||
SegmentPercentilesCollector, StatsAggregation, StatsType, SumAggregation, TermOrdSet,
|
||||
TopHitsAggReqData, TopHitsSegmentCollector, BITSET_MAX_TERM_ORD,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::{
|
||||
GenericSegmentAggregationResultsCollector, SegmentAggregationCollector,
|
||||
@@ -413,12 +413,38 @@ pub(crate) fn build_segment_agg_collector(
|
||||
}
|
||||
AggKind::Cardinality => {
|
||||
let req_data = &mut req.get_cardinality_req_data_mut(node.idx_in_req_data);
|
||||
Ok(Box::new(SegmentCardinalityCollector::from_req(
|
||||
req_data.column_type,
|
||||
node.idx_in_req_data,
|
||||
req_data.accessor.clone(),
|
||||
req_data.missing_value_for_accessor,
|
||||
)))
|
||||
// For str columns, choose the per-bucket entries representation
|
||||
// based on the segment's column.max_value():
|
||||
// * small (< BITSET_MAX_TERM_ORD): `BitSet`, pre-allocated, no promotion machinery.
|
||||
// * large: `TermOrdSet` (sparse FxHashSet that promotes to a paged bitset).
|
||||
// For non-str columns the `entries` field is unused (values go
|
||||
// straight into the HLL sketch); we still pick `TermOrdSet`
|
||||
// because its empty Sparse(FxHashSet) costs nothing.
|
||||
let is_str = req_data.column_type == ColumnType::Str;
|
||||
let max_term_ord_inclusive = if is_str {
|
||||
req_data.accessor.max_value()
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let collector: Box<dyn SegmentAggregationCollector> =
|
||||
if is_str && max_term_ord_inclusive < BITSET_MAX_TERM_ORD {
|
||||
Box::new(SegmentCardinalityCollector::<BitSet>::from_req(
|
||||
req_data.column_type,
|
||||
node.idx_in_req_data,
|
||||
req_data.accessor.clone(),
|
||||
req_data.missing_value_for_accessor,
|
||||
max_term_ord_inclusive,
|
||||
))
|
||||
} else {
|
||||
Box::new(SegmentCardinalityCollector::<TermOrdSet>::from_req(
|
||||
req_data.column_type,
|
||||
node.idx_in_req_data,
|
||||
req_data.accessor.clone(),
|
||||
req_data.missing_value_for_accessor,
|
||||
max_term_ord_inclusive,
|
||||
))
|
||||
};
|
||||
Ok(collector)
|
||||
}
|
||||
AggKind::StatsKind(stats_type) => {
|
||||
let req_data = &mut req.per_request.stats_metric_req_data[node.idx_in_req_data];
|
||||
@@ -1006,10 +1032,20 @@ fn build_terms_or_cardinality_nodes(
|
||||
(idx_in_req_data, AggKind::Terms)
|
||||
}
|
||||
TermsOrCardinalityRequest::Cardinality(ref req) => {
|
||||
// `str_dict_column` is computed once per field; for JSON paths
|
||||
// with mixed types it's `Some` even on the numeric req_data.
|
||||
// Cardinality only consults it for the str column path, so
|
||||
// gate by column_type to avoid driving non-str collectors
|
||||
// through the coupon-cache path.
|
||||
let str_dict_column_for_req = if column_type == ColumnType::Str {
|
||||
str_dict_column.clone()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let idx_in_req_data = data.push_cardinality_req_data(CardinalityAggReqData {
|
||||
accessor,
|
||||
column_type,
|
||||
str_dict_column: str_dict_column.clone(),
|
||||
str_dict_column: str_dict_column_for_req,
|
||||
missing_value_for_accessor,
|
||||
name: agg_name.to_string(),
|
||||
req: req.clone(),
|
||||
|
||||
@@ -115,6 +115,71 @@ pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet<String> {
|
||||
fast_field_names
|
||||
}
|
||||
|
||||
/// Validates that all fields referenced in the aggregation request exist in the schema
|
||||
/// and are configured as fast fields.
|
||||
///
|
||||
/// This is a convenience function for upfront validation before executing aggregations.
|
||||
/// Returns an error if any field doesn't exist or is not a fast field.
|
||||
///
|
||||
/// Validation is intentionally opt-in rather than baked into aggregation execution: the
|
||||
/// default lenient behavior (returning empty results for missing fields) supports
|
||||
/// schema evolution and federated queries where the same request runs against segments
|
||||
/// or indices with different schemas.
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// use tantivy::aggregation::agg_req::{Aggregations, validate_aggregation_fields_exist};
|
||||
/// use tantivy::schema::{Schema, FAST};
|
||||
/// use tantivy::Index;
|
||||
///
|
||||
/// # fn main() -> tantivy::Result<()> {
|
||||
/// // Create a simple index
|
||||
/// let mut schema_builder = Schema::builder();
|
||||
/// schema_builder.add_f64_field("price", FAST);
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
///
|
||||
/// // Parse aggregation request
|
||||
/// let agg_req: Aggregations = serde_json::from_str(r#"{
|
||||
/// "avg_price": { "avg": { "field": "price" } }
|
||||
/// }"#)?;
|
||||
///
|
||||
/// let reader = index.reader()?;
|
||||
/// let searcher = reader.searcher();
|
||||
///
|
||||
/// // Validate fields before executing
|
||||
/// for segment_reader in searcher.segment_readers() {
|
||||
/// validate_aggregation_fields_exist(&agg_req, segment_reader)?;
|
||||
/// }
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn validate_aggregation_fields_exist(
|
||||
aggs: &Aggregations,
|
||||
reader: &crate::SegmentReader,
|
||||
) -> crate::Result<()> {
|
||||
let field_names = get_fast_field_names(aggs);
|
||||
let schema = reader.schema();
|
||||
|
||||
for field_name in field_names {
|
||||
// Check if the field is either directly in the schema or could be part of a json field
|
||||
// present in the schema, and verify it's a fast field.
|
||||
if let Some((field, _path)) = schema.find_field(&field_name) {
|
||||
let field_type = schema.get_field_entry(field).field_type();
|
||||
if !field_type.is_fast() {
|
||||
return Err(crate::TantivyError::SchemaError(format!(
|
||||
"Field '{}' is not a fast field. Aggregations require fast fields.",
|
||||
field_name
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(crate::TantivyError::FieldNotFound(field_name));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
/// All aggregation types.
|
||||
pub enum AggregationVariants {
|
||||
|
||||
@@ -1436,3 +1436,46 @@ fn test_aggregation_on_json_object_mixed_numerical_segments() {
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_field_validation_helper() {
|
||||
// Test the standalone validation helper function for field validation
|
||||
let index = get_test_index_2_segments(false).unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
|
||||
// Test with invalid field
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
r#"{
|
||||
"avg_test": {
|
||||
"avg": { "field": "nonexistent_field" }
|
||||
}
|
||||
}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let result =
|
||||
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
|
||||
assert!(result.is_err());
|
||||
match result {
|
||||
Err(crate::TantivyError::FieldNotFound(field_name)) => {
|
||||
assert_eq!(field_name, "nonexistent_field");
|
||||
}
|
||||
_ => panic!("Expected FieldNotFound error, got: {:?}", result),
|
||||
}
|
||||
|
||||
// Test with valid field
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
r#"{
|
||||
"avg_test": {
|
||||
"avg": { "field": "score" }
|
||||
}
|
||||
}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let result =
|
||||
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::io;
|
||||
|
||||
use columnar::column_values::CompactSpaceU64Accessor;
|
||||
use columnar::{Column, ColumnType, Dictionary, StrColumn};
|
||||
use common::{BitSet, TinySet};
|
||||
use datasketches::hll::{Coupon, HllSketch, HllType, HllUnion};
|
||||
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
@@ -20,6 +21,12 @@ use crate::TantivyError;
|
||||
/// 2^11 = 2048 registers, giving ~2.3% relative error and ~1KB per sketch (Hll4).
|
||||
const LG_K: u8 = 11;
|
||||
|
||||
/// Promote FxHashSet<u64> -> PagedBitset at ~3% density (`len * 32 >
|
||||
/// dict_num_terms`). Past this point the bitset (~`dict_num_terms / 7.5`
|
||||
/// bytes) is smaller than the hashset (~10 B/entry minimum) and avoids
|
||||
/// the per-insert hash.
|
||||
const PROMOTION_RATIO: u64 = 32;
|
||||
|
||||
/// # Cardinality
|
||||
///
|
||||
/// The cardinality aggregation allows for computing an estimate
|
||||
@@ -159,8 +166,12 @@ impl CouponCache {
|
||||
let should_use_dense =
|
||||
highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64;
|
||||
if should_use_dense {
|
||||
let mut coupon_map: Vec<Coupon> = vec![Coupon::EMPTY; highest_term_ord as usize + 1];
|
||||
for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) {
|
||||
// We don't really care about the value here. We will populate all the values we will
|
||||
// read anyway.
|
||||
let uninitialized_coupon = Coupon::from_hash(0);
|
||||
let mut coupon_map: Vec<Coupon> =
|
||||
vec![uninitialized_coupon; highest_term_ord as usize + 1];
|
||||
for (term_ord, coupon) in term_ords.into_iter().zip(coupons) {
|
||||
coupon_map[term_ord as usize] = coupon;
|
||||
}
|
||||
CouponCache::Dense {
|
||||
@@ -177,9 +188,263 @@ impl CouponCache {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SegmentCardinalityCollector {
|
||||
// =================================================================
|
||||
// PagedBitset: a sparse bitset indexed by term_ord.
|
||||
//
|
||||
// Used as the dense alternative to FxHashSet<u64> once a string
|
||||
// cardinality bucket has accumulated enough unique term ordinals.
|
||||
// Memory is bounded to (touched pages) * (page bytes), not
|
||||
// (max_term_ord / 8).
|
||||
//
|
||||
// Page geometry mirrors `PagedTermMap` in `term_agg.rs`: 1024 ords
|
||||
// per page, lazy `Vec<Option<Box<Page>>>` directory.
|
||||
// =================================================================
|
||||
const BITSET_PAGE_SHIFT: u32 = 10;
|
||||
const BITSET_PAGE_BITS: u64 = 1u64 << BITSET_PAGE_SHIFT; // 1024
|
||||
const BITSET_PAGE_MASK: u64 = BITSET_PAGE_BITS - 1;
|
||||
const BITSET_WORDS_PER_PAGE: usize = (BITSET_PAGE_BITS / 64) as usize; // 16
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PagedBitsetPage {
|
||||
words: [TinySet; BITSET_WORDS_PER_PAGE],
|
||||
}
|
||||
|
||||
impl PagedBitsetPage {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
words: [TinySet::empty(); BITSET_WORDS_PER_PAGE],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct PagedBitset {
|
||||
pages: Vec<Option<Box<PagedBitsetPage>>>,
|
||||
/// Cached number of set bits, maintained on insert.
|
||||
count: u64,
|
||||
}
|
||||
|
||||
impl PagedBitset {
|
||||
/// Allocates a directory big enough to hold ords up to and including
|
||||
/// `max_term_ord`. Pages are allocated lazily on first set.
|
||||
fn with_max_term_ord(max_term_ord: u64) -> Self {
|
||||
let max_page_idx = (max_term_ord >> BITSET_PAGE_SHIFT) as usize;
|
||||
let num_pages = max_page_idx + 1;
|
||||
Self {
|
||||
pages: vec![None; num_pages],
|
||||
count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn insert(&mut self, term_ord: u64) {
|
||||
let page_idx = (term_ord >> BITSET_PAGE_SHIFT) as usize;
|
||||
let intra = term_ord & BITSET_PAGE_MASK;
|
||||
let word_idx = (intra >> 6) as usize;
|
||||
let bit_idx = (intra & 63) as u32;
|
||||
|
||||
let page = match &mut self.pages[page_idx] {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
self.pages[page_idx] = Some(Box::new(PagedBitsetPage::new()));
|
||||
self.pages[page_idx].as_mut().unwrap()
|
||||
}
|
||||
};
|
||||
if page.words[word_idx].insert_mut(bit_idx) {
|
||||
self.count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of set bits. O(1).
|
||||
#[inline]
|
||||
fn len(&self) -> u64 {
|
||||
self.count
|
||||
}
|
||||
|
||||
/// Iterate set ords in ascending order.
|
||||
fn iter_sorted(&self) -> impl Iterator<Item = u64> + '_ {
|
||||
self.pages
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(page_idx, page_opt)| page_opt.as_ref().map(|p| (page_idx, p)))
|
||||
.flat_map(|(page_idx, page)| {
|
||||
let page_base_ord = (page_idx as u64) << BITSET_PAGE_SHIFT;
|
||||
page.words
|
||||
.iter()
|
||||
.enumerate()
|
||||
.flat_map(move |(word_idx, &word)| {
|
||||
let word_base_ord = page_base_ord + (word_idx as u64) * 64;
|
||||
word.into_iter()
|
||||
.map(move |bit| word_base_ord + u64::from(bit))
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Threshold below which we use `BitSet` instead of `TermOrdSet`.
|
||||
///
|
||||
/// Both `BitSet` and `FxHashSet<u64>` have the same 32-byte struct, so the comparison is heap only:
|
||||
/// * `BitSet` at T=256: 5 `TinySet` words covering 258 bits (with the missing-value sentinel) =
|
||||
/// 40 bytes.
|
||||
/// * `FxHashSet<u64>` after one insert: 4-bucket hashbrown table ≈ 56 bytes
|
||||
pub(crate) const BITSET_MAX_TERM_ORD: u64 = 256;
|
||||
|
||||
// =================================================================
|
||||
// TermOrdAccumulator: per-bucket abstraction over the entries set.
|
||||
//
|
||||
// Implementations:
|
||||
// - `BitSet` (from `common`): used when `column.max_value()` is small (< BITSET_MAX_TERM_ORD).
|
||||
// Pre-allocated, no promotion.
|
||||
// - `TermOrdSet`: adaptive, starts as FxHashSet and promotes to a paged bitset when occupancy
|
||||
// crosses the density threshold (only if promotion is enabled — typically gated on top-level
|
||||
// aggregation).
|
||||
//
|
||||
// The trait lets `SegmentCardinalityCollector` be generic over the choice
|
||||
// so the hot collect() loop monomorphizes to a direct call (no enum
|
||||
// dispatch per insert).
|
||||
// =================================================================
|
||||
pub(crate) trait TermOrdAccumulator: Sized {
|
||||
/// Construct an empty accumulator.
|
||||
/// `max_term_ord_inclusive` is the largest term_ord that may be
|
||||
/// inserted (used to size pre-allocated bitsets and the dense bitset
|
||||
/// on promotion).
|
||||
fn new(max_term_ord_inclusive: u64) -> Self;
|
||||
fn insert(&mut self, term_ord: u64);
|
||||
/// Bulk insert. Implementations may override to hoist any inner
|
||||
/// dispatch outside the loop. Default loops `insert`.
|
||||
#[inline]
|
||||
fn extend_from_iter<I: IntoIterator<Item = u64>>(&mut self, ords: I) {
|
||||
for ord in ords {
|
||||
self.insert(ord);
|
||||
}
|
||||
}
|
||||
/// Hook called once per ingested block. Adaptive impls use this to
|
||||
/// decide on sparse->dense promotion.
|
||||
fn maybe_compact(&mut self) {}
|
||||
fn len(&self) -> usize;
|
||||
fn iter_ords(&self) -> impl Iterator<Item = u64> + '_;
|
||||
}
|
||||
|
||||
impl TermOrdAccumulator for BitSet {
|
||||
#[inline]
|
||||
fn new(max_term_ord_inclusive: u64) -> Self {
|
||||
// `BitSet::with_max_value(M)` accepts ords in [0, M).
|
||||
// We need ords up to and including `max_term_ord_inclusive`, plus
|
||||
// the missing-value sentinel `column.max_value() + 1`.
|
||||
BitSet::with_max_value((max_term_ord_inclusive + 2) as u32)
|
||||
}
|
||||
#[inline]
|
||||
fn insert(&mut self, term_ord: u64) {
|
||||
BitSet::insert(self, term_ord as u32);
|
||||
}
|
||||
#[inline]
|
||||
fn len(&self) -> usize {
|
||||
BitSet::len(self)
|
||||
}
|
||||
fn iter_ords(&self) -> impl Iterator<Item = u64> + '_ {
|
||||
// `BitSet` itself doesn't expose iteration, but
|
||||
// `BitSet::tinyset(bucket)` does. Walk per-bucket and yield each
|
||||
// set bit. The capacity is `max_value()`; iterating to
|
||||
// `div_ceil(64)` covers every possible ord exactly once.
|
||||
let num_buckets = self.max_value().div_ceil(64);
|
||||
(0..num_buckets).flat_map(move |bucket| {
|
||||
let chunk_base = u64::from(bucket) * 64;
|
||||
self.tinyset(bucket)
|
||||
.into_iter()
|
||||
.map(move |bit| chunk_base + u64::from(bit))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// =================================================================
|
||||
// TermOrdSet: adaptive sparse->dense accumulator.
|
||||
//
|
||||
// Starts as an FxHashSet (cheap when few ords are seen). When occupancy
|
||||
// crosses `len * PROMOTION_RATIO > max_term_ord_inclusive`, drains into
|
||||
// a `PagedBitset` and continues dense. Promotion is one-way.
|
||||
// =================================================================
|
||||
pub(crate) struct TermOrdSet {
|
||||
inner: TermOrdSetInner,
|
||||
/// Largest term_ord that may be inserted. Used for both sizing the
|
||||
/// dense bitset on promotion and as the promotion-threshold reference.
|
||||
max_term_ord_inclusive: u64,
|
||||
}
|
||||
|
||||
enum TermOrdSetInner {
|
||||
Sparse(FxHashSet<u64>),
|
||||
Dense(PagedBitset),
|
||||
}
|
||||
|
||||
impl TermOrdAccumulator for TermOrdSet {
|
||||
fn new(max_term_ord_inclusive: u64) -> Self {
|
||||
Self {
|
||||
inner: TermOrdSetInner::Sparse(FxHashSet::default()),
|
||||
max_term_ord_inclusive,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn insert(&mut self, term_ord: u64) {
|
||||
match &mut self.inner {
|
||||
TermOrdSetInner::Sparse(set) => {
|
||||
set.insert(term_ord);
|
||||
}
|
||||
TermOrdSetInner::Dense(bitset) => bitset.insert(term_ord),
|
||||
}
|
||||
}
|
||||
|
||||
/// Hoist the Sparse/Dense match outside the per-ord loop so that a
|
||||
/// block of inserts dispatches once.
|
||||
fn extend_from_iter<I: IntoIterator<Item = u64>>(&mut self, ords: I) {
|
||||
match &mut self.inner {
|
||||
TermOrdSetInner::Sparse(set) => {
|
||||
for ord in ords {
|
||||
set.insert(ord);
|
||||
}
|
||||
}
|
||||
TermOrdSetInner::Dense(bitset) => {
|
||||
for ord in ords {
|
||||
bitset.insert(ord);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_compact(&mut self) {
|
||||
let TermOrdSetInner::Sparse(set) = &mut self.inner else {
|
||||
return;
|
||||
};
|
||||
if set.len() as u64 * PROMOTION_RATIO <= self.max_term_ord_inclusive {
|
||||
return;
|
||||
}
|
||||
// Size for ord <= max_term_ord_inclusive plus the missing sentinel
|
||||
// (column.max_value() + 1, which may equal max_term_ord_inclusive
|
||||
// when the column references every dictionary term).
|
||||
let mut bitset = PagedBitset::with_max_term_ord(self.max_term_ord_inclusive + 1);
|
||||
let set = std::mem::take(set);
|
||||
for ord in set {
|
||||
bitset.insert(ord);
|
||||
}
|
||||
self.inner = TermOrdSetInner::Dense(bitset);
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
match &self.inner {
|
||||
TermOrdSetInner::Sparse(set) => set.len(),
|
||||
TermOrdSetInner::Dense(bitset) => bitset.len() as usize,
|
||||
}
|
||||
}
|
||||
|
||||
fn iter_ords(&self) -> impl Iterator<Item = u64> + '_ {
|
||||
match &self.inner {
|
||||
TermOrdSetInner::Sparse(set) => itertools::Either::Left(set.iter().copied()),
|
||||
TermOrdSetInner::Dense(bitset) => itertools::Either::Right(bitset.iter_sorted()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SegmentCardinalityCollector<S: TermOrdAccumulator> {
|
||||
/// Buckets are Some(_) until they get consumed by into_intermediate_results().
|
||||
buckets: Vec<Option<SegmentCardinalityCollectorBucket>>,
|
||||
buckets: Vec<Option<SegmentCardinalityCollectorBucket<S>>>,
|
||||
accessor_idx: usize,
|
||||
/// The column accessor to access the fast field values.
|
||||
accessor: Column<u64>,
|
||||
@@ -188,9 +453,13 @@ pub(crate) struct SegmentCardinalityCollector {
|
||||
/// The missing value normalized to the internal u64 representation of the field type.
|
||||
missing_value_for_accessor: Option<u64>,
|
||||
coupon_cache: Option<CouponCache>,
|
||||
/// Largest term_ord that may be inserted into a bucket. For str columns
|
||||
/// this is `accessor.max_value()`; for non-str columns this is unused
|
||||
/// (no inserts go into `entries`) and set to 0.
|
||||
max_term_ord_inclusive: u64,
|
||||
}
|
||||
|
||||
impl Debug for SegmentCardinalityCollector {
|
||||
impl<S: TermOrdAccumulator> Debug for SegmentCardinalityCollector<S> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentCardinalityCollector")
|
||||
.field("column_type", &self.column_type)
|
||||
@@ -202,16 +471,21 @@ impl Debug for SegmentCardinalityCollector {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SegmentCardinalityCollectorBucket {
|
||||
cardinality: CardinalityCollector,
|
||||
entries: FxHashSet<u64>,
|
||||
/// Per-bucket state. Shape depends on column kind: str columns dedup
|
||||
/// term ords and only build the HLL sketch at finalization (saves the
|
||||
/// ~96 B `CardinalityCollector` per bucket during collect); numeric/IpAddr
|
||||
/// columns feed the sketch directly during collect.
|
||||
pub(crate) enum SegmentCardinalityCollectorBucket<S: TermOrdAccumulator> {
|
||||
Str(S),
|
||||
Numeric(CardinalityCollector),
|
||||
}
|
||||
impl SegmentCardinalityCollectorBucket {
|
||||
impl<S: TermOrdAccumulator> SegmentCardinalityCollectorBucket<S> {
|
||||
#[inline(always)]
|
||||
pub fn new(column_type: ColumnType) -> Self {
|
||||
Self {
|
||||
cardinality: CardinalityCollector::new(column_type as u8),
|
||||
entries: FxHashSet::default(),
|
||||
pub fn new(column_type: ColumnType, max_term_ord_inclusive: u64) -> Self {
|
||||
if column_type == ColumnType::Str {
|
||||
Self::Str(S::new(max_term_ord_inclusive))
|
||||
} else {
|
||||
Self::Numeric(CardinalityCollector::new(column_type as u8))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,37 +496,57 @@ impl SegmentCardinalityCollectorBucket {
|
||||
//
|
||||
// If the column is str, then the values are dictionary encoded
|
||||
// and have not been added to the sketch yet.
|
||||
// We need to resolves the term ords accumulated in self.entries
|
||||
// with the coupon cache, and append the results to the sketch.
|
||||
// We need to resolves the term ords accumulated in the str entries
|
||||
// with the coupon cache, and append the results to a fresh sketch.
|
||||
fn into_intermediate_metric_result(
|
||||
mut self,
|
||||
self,
|
||||
coupon_cache_opt: Option<&CouponCache>,
|
||||
) -> crate::Result<IntermediateMetricResult> {
|
||||
if let Some(coupon_cache) = coupon_cache_opt {
|
||||
assert!(self.cardinality.sketch.is_empty());
|
||||
append_to_sketch(&self.entries, coupon_cache, &mut self.cardinality);
|
||||
}
|
||||
Ok(IntermediateMetricResult::Cardinality(self.cardinality))
|
||||
let cardinality = match self {
|
||||
Self::Str(entries) => {
|
||||
let mut cardinality = CardinalityCollector::new(ColumnType::Str as u8);
|
||||
if let Some(coupon_cache) = coupon_cache_opt {
|
||||
// Sketch must be empty for str columns: coupons are appended here
|
||||
// from the term_ord set (and not directly during collection).
|
||||
assert!(cardinality.sketch.is_empty());
|
||||
append_to_sketch(&entries, coupon_cache, &mut cardinality);
|
||||
}
|
||||
cardinality
|
||||
}
|
||||
Self::Numeric(cardinality) => cardinality,
|
||||
};
|
||||
Ok(IntermediateMetricResult::Cardinality(cardinality))
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a coupon cache from the given buckets, dictionary, and optional missing value.
|
||||
/// Returns a mapping from term_ord to the hash (coupon) of the associated term.
|
||||
fn build_coupon_cache(
|
||||
buckets: &[Option<SegmentCardinalityCollectorBucket>],
|
||||
fn build_coupon_cache<S: TermOrdAccumulator>(
|
||||
buckets: &[Option<SegmentCardinalityCollectorBucket<S>>],
|
||||
dictionary: &Dictionary,
|
||||
missing_value_opt: Option<&Key>,
|
||||
) -> io::Result<CouponCache> {
|
||||
let term_ords_capacity: usize = buckets
|
||||
.iter()
|
||||
.flatten()
|
||||
.map(|bucket| bucket.entries.len())
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
* 2;
|
||||
let mut term_ords_set = FxHashSet::with_capacity_and_hasher(term_ords_capacity, FxBuildHasher);
|
||||
// Caller restricts this to str cardinality collectors, so every
|
||||
// present bucket must be the `Str` variant. Pass 1 validates and
|
||||
// computes the capacity hint; pass 2 inserts.
|
||||
let mut max_bucket_len = 0usize;
|
||||
for bucket in buckets.iter().flatten() {
|
||||
term_ords_set.extend(bucket.entries.iter().copied());
|
||||
match bucket {
|
||||
SegmentCardinalityCollectorBucket::Str(entries) => {
|
||||
max_bucket_len = max_bucket_len.max(entries.len());
|
||||
}
|
||||
SegmentCardinalityCollectorBucket::Numeric(_) => {
|
||||
return Err(io::Error::other(
|
||||
"build_coupon_cache invoked with a non-str bucket",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut term_ords_set = FxHashSet::with_capacity_and_hasher(max_bucket_len * 2, FxBuildHasher);
|
||||
for bucket in buckets.iter().flatten() {
|
||||
if let SegmentCardinalityCollectorBucket::Str(entries) = bucket {
|
||||
term_ords_set.extend(entries.iter_ords());
|
||||
}
|
||||
}
|
||||
let mut term_ords: Vec<u64> = term_ords_set.into_iter().collect();
|
||||
term_ords.sort_unstable();
|
||||
@@ -284,8 +578,8 @@ fn build_coupon_cache(
|
||||
Ok(CouponCache::new(term_ords, coupons, missing_coupon_opt))
|
||||
}
|
||||
|
||||
fn append_to_sketch(
|
||||
term_ords: &FxHashSet<u64>,
|
||||
fn append_to_sketch<S: TermOrdAccumulator>(
|
||||
term_ords: &S,
|
||||
coupon_cache: &CouponCache,
|
||||
sketch: &mut CardinalityCollector,
|
||||
) {
|
||||
@@ -294,7 +588,7 @@ fn append_to_sketch(
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
} => {
|
||||
for &term_ord in term_ords {
|
||||
for term_ord in term_ords.iter_ords() {
|
||||
if let Some(coupon) = coupon_map
|
||||
.get(term_ord as usize)
|
||||
.copied()
|
||||
@@ -308,8 +602,8 @@ fn append_to_sketch(
|
||||
coupon_map,
|
||||
missing_coupon_opt,
|
||||
} => {
|
||||
for term_ord in term_ords {
|
||||
if let Some(coupon) = coupon_map.get(term_ord).copied().or(*missing_coupon_opt) {
|
||||
for term_ord in term_ords.iter_ords() {
|
||||
if let Some(coupon) = coupon_map.get(&term_ord).copied().or(*missing_coupon_opt) {
|
||||
sketch.insert_coupon(coupon);
|
||||
}
|
||||
}
|
||||
@@ -317,12 +611,13 @@ fn append_to_sketch(
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentCardinalityCollector {
|
||||
impl<S: TermOrdAccumulator> SegmentCardinalityCollector<S> {
|
||||
pub fn from_req(
|
||||
column_type: ColumnType,
|
||||
accessor_idx: usize,
|
||||
accessor: Column<u64>,
|
||||
missing_value_for_accessor: Option<u64>,
|
||||
max_term_ord_inclusive: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
buckets: Vec::new(),
|
||||
@@ -331,6 +626,7 @@ impl SegmentCardinalityCollector {
|
||||
accessor,
|
||||
missing_value_for_accessor,
|
||||
coupon_cache: None,
|
||||
max_term_ord_inclusive,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -347,7 +643,9 @@ impl SegmentCardinalityCollector {
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
impl<S: TermOrdAccumulator + 'static> SegmentAggregationCollector
|
||||
for SegmentCardinalityCollector<S>
|
||||
{
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
@@ -402,31 +700,41 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
));
|
||||
};
|
||||
let col_block_accessor = &agg_data.column_block_accessor;
|
||||
if self.column_type == ColumnType::Str {
|
||||
for term_ord in col_block_accessor.iter_vals() {
|
||||
bucket.entries.insert(term_ord);
|
||||
match bucket {
|
||||
SegmentCardinalityCollectorBucket::Str(entries) => {
|
||||
// Promotion check runs on the pre-block state: the first call
|
||||
// sees an empty set (no-op), and the last block of inserts
|
||||
// doesn't trigger a promotion of a set we won't grow further.
|
||||
// The trait dispatches once per block (via `extend_from_iter`)
|
||||
// for adaptive variants and inlines to a tight loop for the
|
||||
// BitSet path.
|
||||
entries.maybe_compact();
|
||||
entries.extend_from_iter(col_block_accessor.iter_vals());
|
||||
}
|
||||
} else if self.column_type == ColumnType::IpAddr {
|
||||
let compact_space_accessor = self
|
||||
.accessor
|
||||
.values
|
||||
.clone()
|
||||
.downcast_arc::<CompactSpaceU64Accessor>()
|
||||
.map_err(|_| {
|
||||
TantivyError::AggregationError(
|
||||
crate::aggregation::AggregationError::InternalError(
|
||||
"Type mismatch: Could not downcast to CompactSpaceU64Accessor"
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?;
|
||||
for val in col_block_accessor.iter_vals() {
|
||||
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
|
||||
bucket.cardinality.insert(val);
|
||||
}
|
||||
} else {
|
||||
for val in col_block_accessor.iter_vals() {
|
||||
bucket.cardinality.insert(val);
|
||||
SegmentCardinalityCollectorBucket::Numeric(cardinality) => {
|
||||
if self.column_type == ColumnType::IpAddr {
|
||||
let compact_space_accessor = self
|
||||
.accessor
|
||||
.values
|
||||
.clone()
|
||||
.downcast_arc::<CompactSpaceU64Accessor>()
|
||||
.map_err(|_| {
|
||||
TantivyError::AggregationError(
|
||||
crate::aggregation::AggregationError::InternalError(
|
||||
"Type mismatch: Could not downcast to CompactSpaceU64Accessor"
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?;
|
||||
for val in col_block_accessor.iter_vals() {
|
||||
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
|
||||
cardinality.insert(val);
|
||||
}
|
||||
} else {
|
||||
for val in col_block_accessor.iter_vals() {
|
||||
cardinality.insert(val);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -439,8 +747,13 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
_agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
if max_bucket as usize >= self.buckets.len() {
|
||||
let column_type = self.column_type;
|
||||
let max_term_ord_inclusive = self.max_term_ord_inclusive;
|
||||
self.buckets.resize_with(max_bucket as usize + 1, || {
|
||||
Some(SegmentCardinalityCollectorBucket::new(self.column_type))
|
||||
Some(SegmentCardinalityCollectorBucket::<S>::new(
|
||||
column_type,
|
||||
max_term_ord_inclusive,
|
||||
))
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
@@ -458,13 +771,14 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
return None;
|
||||
}
|
||||
let bucket = self.buckets.get(bucket_id as usize)?.as_ref()?;
|
||||
// For string columns the HLL sketch is empty until materialization; entries holds
|
||||
// the deduplicated term ordinals seen, which is the exact distinct count.
|
||||
// For numeric columns the sketch is populated during collect.
|
||||
if self.column_type == ColumnType::Str {
|
||||
Some(bucket.entries.len() as f64)
|
||||
} else {
|
||||
Some(bucket.cardinality.sketch.estimate().trunc())
|
||||
// For string columns the sketch isn't built until finalization; the
|
||||
// term_ord set's len is the exact distinct count. For numeric columns
|
||||
// the sketch is populated during collect.
|
||||
match bucket {
|
||||
SegmentCardinalityCollectorBucket::Str(entries) => Some(entries.len() as f64),
|
||||
SegmentCardinalityCollectorBucket::Numeric(cardinality) => {
|
||||
Some(cardinality.sketch.estimate().trunc())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -511,7 +825,7 @@ impl<'de> Deserialize<'de> for CardinalityCollector {
|
||||
impl CardinalityCollector {
|
||||
fn new(salt: u8) -> Self {
|
||||
Self {
|
||||
sketch: HllSketch::new(LG_K, HllType::Hll4),
|
||||
sketch: HllSketch::new(LG_K, HllType::Hll8),
|
||||
salt,
|
||||
}
|
||||
}
|
||||
@@ -542,7 +856,7 @@ impl CardinalityCollector {
|
||||
let mut union = HllUnion::new(LG_K);
|
||||
union.update(&self.sketch);
|
||||
union.update(&right.sketch);
|
||||
self.sketch = union.to_sketch(HllType::Hll4);
|
||||
self.sketch = union.to_sketch(HllType::Hll8);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -614,6 +928,134 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build a single-segment string-cardinality index with 32 unique terms.
|
||||
/// `column.max_value() = 31` is well below `BITSET_MAX_TERM_ORD`,
|
||||
/// so the bucket exercises the `BitSet` path end to end.
|
||||
#[test]
|
||||
fn cardinality_aggregation_test_str_bitset() -> crate::Result<()> {
|
||||
let terms: Vec<String> = (0..32).map(|i| format!("term_{i}")).collect();
|
||||
let term_refs: Vec<Vec<&str>> = terms.iter().map(|t| vec![t.as_str()]).collect::<Vec<_>>();
|
||||
// single segment so we have a single dictionary of 32 terms.
|
||||
let index = get_test_index_from_terms(true, &term_refs)?;
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": { "field": "string_id" }
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
assert_eq!(res["cardinality"]["value"], 32.0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `BitSet` path with a `missing` parameter: the column-level missing
|
||||
/// sentinel (`column.max_value() + 1`) flows into the bitset, the
|
||||
/// dict lookup filter at finalization drops it, and the missing
|
||||
/// coupon is applied separately.
|
||||
#[test]
|
||||
fn cardinality_aggregation_test_str_bitset_with_missing() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let name_field = schema_builder.add_text_field("name", STRING | FAST);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
for i in 0..16 {
|
||||
let term = format!("t{i:02}");
|
||||
writer.add_document(doc!(name_field => term)).unwrap();
|
||||
}
|
||||
// One empty doc, exercising the missing sentinel.
|
||||
writer.add_document(doc!()).unwrap();
|
||||
writer.commit().unwrap();
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "name",
|
||||
"missing": "MISSING_SENTINEL_KEY",
|
||||
}
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index).unwrap();
|
||||
// 16 distinct real terms + 1 distinct "missing" value = 17.
|
||||
assert_eq!(res["cardinality"]["value"], 17.0);
|
||||
}
|
||||
|
||||
/// Unit-test the PagedBitset itself: cross-page inserts produce sorted
|
||||
/// iteration, len() matches the inserted set, and duplicates are
|
||||
/// idempotent.
|
||||
#[test]
|
||||
fn paged_bitset_basic() {
|
||||
use super::PagedBitset;
|
||||
// Span several pages: BITSET_PAGE_BITS = 1024, so ords > 1024 land
|
||||
// on the second page, > 2048 on the third, etc.
|
||||
let ords = [0u64, 1, 63, 64, 1023, 1024, 1025, 4096, 4097, 9999, 10_000];
|
||||
let max_ord = *ords.iter().max().unwrap();
|
||||
let mut bitset = PagedBitset::with_max_term_ord(max_ord);
|
||||
for &ord in &ords {
|
||||
bitset.insert(ord);
|
||||
// Idempotent: inserting again must not increase count.
|
||||
bitset.insert(ord);
|
||||
}
|
||||
assert_eq!(bitset.len(), ords.len() as u64);
|
||||
let collected: Vec<u64> = bitset.iter_sorted().collect();
|
||||
let mut expected: Vec<u64> = ords.to_vec();
|
||||
expected.sort_unstable();
|
||||
assert_eq!(collected, expected);
|
||||
}
|
||||
|
||||
/// Unit-test `TermOrdSet`: starts Sparse, promotes to Dense on
|
||||
/// `maybe_compact` once the density threshold is crossed, and
|
||||
/// `iter_ords()` yields the same set in either state. Ords spanning
|
||||
/// multiple paged-bitset pages exercise the Dense iter ordering.
|
||||
#[test]
|
||||
fn term_ord_set_promotes_on_maybe_compact() {
|
||||
use super::{TermOrdAccumulator, TermOrdSet, PROMOTION_RATIO};
|
||||
// Pick max so promotion needs few inserts: len * RATIO > max with
|
||||
// RATIO=32 and max=64 trips at len=3 (3*32=96 > 64).
|
||||
let max_term_ord = 64u64;
|
||||
let mut set = <TermOrdSet as TermOrdAccumulator>::new(max_term_ord);
|
||||
// Two inserts: should stay Sparse after maybe_compact (2 * RATIO = 64, not > 64).
|
||||
set.insert(0);
|
||||
set.insert(7);
|
||||
set.maybe_compact();
|
||||
assert_eq!(set.len(), 2);
|
||||
|
||||
// Third insert promotes on next maybe_compact.
|
||||
set.insert(20);
|
||||
assert_eq!(set.len(), 3);
|
||||
// Sanity check: at len=3, 3 * PROMOTION_RATIO = 96 > 64.
|
||||
assert!(3u64 * PROMOTION_RATIO > max_term_ord);
|
||||
set.maybe_compact();
|
||||
|
||||
// Post-promotion: extending continues to work.
|
||||
set.insert(15);
|
||||
set.insert(15); // dup
|
||||
assert_eq!(set.len(), 4);
|
||||
|
||||
let mut collected: Vec<u64> = set.iter_ords().collect();
|
||||
collected.sort_unstable();
|
||||
assert_eq!(collected, vec![0, 7, 15, 20]);
|
||||
}
|
||||
|
||||
/// Unit-test the `BitSet` impl of `TermOrdAccumulator`: insert,
|
||||
/// dedup, and iter_ords order.
|
||||
#[test]
|
||||
fn bitset_accumulator_basic() {
|
||||
use common::BitSet;
|
||||
|
||||
use super::TermOrdAccumulator;
|
||||
let mut set = <BitSet as TermOrdAccumulator>::new(255);
|
||||
for ord in [0u64, 1, 63, 64, 65, 128, 200, 200, 0] {
|
||||
<BitSet as TermOrdAccumulator>::insert(&mut set, ord);
|
||||
}
|
||||
assert_eq!(<BitSet as TermOrdAccumulator>::len(&set), 7);
|
||||
let collected: Vec<u64> = set.iter_ords().collect();
|
||||
assert_eq!(collected, vec![0, 1, 63, 64, 65, 128, 200]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_aggregation_u64() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
@@ -705,6 +1147,42 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A JSON path that resolves to both a Str column and a numeric column
|
||||
/// produces two collector instances per segment — one with `Str` buckets
|
||||
/// and one with `Numeric` buckets. Their `IntermediateMetricResult`s must
|
||||
/// merge into the union cardinality.
|
||||
#[test]
|
||||
fn cardinality_aggregation_json_str_and_numeric() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let field = schema_builder.add_json_field("json", FAST);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let mut writer = index.writer_for_tests()?;
|
||||
writer.add_document(doc!(field => json!({"value": "hello"})))?;
|
||||
writer.add_document(doc!(field => json!({"value": "world"})))?;
|
||||
writer.add_document(doc!(field => json!({"value": "hello"})))?; // dup str
|
||||
writer.add_document(doc!(field => json!({"value": i64::from_u64(7u64)})))?;
|
||||
writer.add_document(doc!(field => json!({"value": i64::from_u64(42u64)})))?;
|
||||
writer.add_document(doc!(field => json!({"value": i64::from_u64(7u64)})))?; // dup num
|
||||
writer.commit()?;
|
||||
}
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "json.value"
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
// 4 distinct values: "hello", "world", 7, 42.
|
||||
assert_eq!(res["cardinality"]["value"], 4.0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_serde_roundtrip() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
@@ -301,11 +301,14 @@ pub trait SegmentCollector: 'static {
|
||||
/// The query pushes the scored document to the collector via this method.
|
||||
fn collect(&mut self, doc: DocId, score: Score);
|
||||
|
||||
/// The query pushes the scored document to the collector via this method.
|
||||
/// The query pushes the matched documents to the collector via this method.
|
||||
/// This method is used when the collector does not require scoring.
|
||||
///
|
||||
/// See [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN) for the
|
||||
/// buffer size passed to the collector.
|
||||
/// `docs` is a block of matched doc ids. Doc ids are produced in increasing
|
||||
/// order, in windows of [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN),
|
||||
/// but several windows are accumulated before being flushed here, so the
|
||||
/// block may be larger than `COLLECT_BLOCK_BUFFER_LEN`. Implementations must
|
||||
/// not assume any particular maximum length.
|
||||
fn collect_block(&mut self, docs: &[DocId]) {
|
||||
for doc in docs {
|
||||
self.collect(*doc, 0.0);
|
||||
|
||||
@@ -52,7 +52,7 @@ impl<T: FastValue> SortKeyComputer for SortByStaticFastValue<T> {
|
||||
if schema_type != T::to_type() {
|
||||
return Err(crate::TantivyError::SchemaError(format!(
|
||||
"Field `{}` is of type {schema_type:?}, not of the type {:?}.",
|
||||
&self.field,
|
||||
self.field,
|
||||
T::to_type()
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -11,9 +11,14 @@ use crate::DocId;
|
||||
/// to compare `[u32; 4]`.
|
||||
pub const TERMINATED: DocId = i32::MAX as u32;
|
||||
|
||||
/// The collect_block method on `SegmentCollector` uses a buffer of this size.
|
||||
/// Passed results to `collect_block` will not exceed this size and will be
|
||||
/// exactly this size as long as we can fill the buffer.
|
||||
/// Window size used by [`DocSet::fill_buffer`]: a single `fill_buffer` call
|
||||
/// writes at most this many doc ids, and exactly this many as long as the
|
||||
/// `DocSet` is not exhausted.
|
||||
///
|
||||
/// Note that this is *not* the maximum length of the slice passed to
|
||||
/// `SegmentCollector::collect_block`: the collection loop accumulates several
|
||||
/// such windows into a larger buffer before flushing it, so `collect_block`
|
||||
/// may receive a block larger than `COLLECT_BLOCK_BUFFER_LEN`.
|
||||
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;
|
||||
|
||||
/// Number of `TinySet` (64-bit) buckets in a block used by [`DocSet::fill_bitset_block`].
|
||||
|
||||
@@ -6,6 +6,7 @@ use common::{ByteCount, HasLen};
|
||||
use fnv::FnvHashMap;
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::directory::error::OpenReadError;
|
||||
use crate::directory::{CompositeFile, FileSlice};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
|
||||
@@ -159,12 +160,10 @@ impl SegmentReader {
|
||||
let postings_file = segment.open_read(SegmentComponent::Postings)?;
|
||||
let postings_composite = CompositeFile::open(&postings_file)?;
|
||||
|
||||
let positions_composite = {
|
||||
if let Ok(positions_file) = segment.open_read(SegmentComponent::Positions) {
|
||||
CompositeFile::open(&positions_file)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
let positions_composite = match segment.open_read(SegmentComponent::Positions) {
|
||||
Ok(positions_file) => CompositeFile::open(&positions_file)?,
|
||||
Err(OpenReadError::FileDoesNotExist(_)) => CompositeFile::empty(),
|
||||
Err(open_read_error) => return Err(open_read_error.into()),
|
||||
};
|
||||
|
||||
let schema = segment.schema();
|
||||
@@ -323,7 +322,7 @@ impl SegmentReader {
|
||||
// Without expand dots enabled dots need to be escaped.
|
||||
let escaped_json_path = json_path.replace('.', "\\.");
|
||||
let full_path = format!("{field_name}.{escaped_json_path}");
|
||||
let full_path_unescaped = format!("{}.{}", field_name, &json_path);
|
||||
let full_path_unescaped = format!("{}.{}", field_name, json_path);
|
||||
map_to_canonical.insert(full_path_unescaped, full_path.to_string());
|
||||
full_path
|
||||
} else {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
|
||||
use crate::index::SegmentReader;
|
||||
use crate::postings::FreqReadingOption;
|
||||
use crate::query::disjunction::Disjunction;
|
||||
@@ -531,13 +530,12 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
|
||||
) -> crate::Result<()> {
|
||||
let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?;
|
||||
let num_docs = reader.num_docs();
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
|
||||
match scorer {
|
||||
SpecializedScorer::TermUnion(term_scorers) => {
|
||||
let mut union_scorer =
|
||||
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
|
||||
for_each_docset_buffered(&mut union_scorer, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut union_scorer, callback);
|
||||
}
|
||||
SpecializedScorer::TermIntersection(term_scorers) => {
|
||||
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
|
||||
@@ -545,10 +543,10 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
|
||||
.map(|term_scorer| Box::new(term_scorer) as Box<dyn Scorer>)
|
||||
.collect();
|
||||
let mut intersection = intersect_scorers(boxed_scorers, num_docs);
|
||||
for_each_docset_buffered(intersection.as_mut(), &mut buffer, callback);
|
||||
for_each_docset_buffered(intersection.as_mut(), callback);
|
||||
}
|
||||
SpecializedScorer::Other(mut scorer) => {
|
||||
for_each_docset_buffered(scorer.as_mut(), &mut buffer, callback);
|
||||
for_each_docset_buffered(scorer.as_mut(), callback);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use super::term_scorer::TermScorer;
|
||||
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
|
||||
use crate::docset::DocSet;
|
||||
use crate::fieldnorm::FieldNormReader;
|
||||
use crate::index::SegmentReader;
|
||||
use crate::postings::SegmentPostings;
|
||||
@@ -92,13 +92,11 @@ impl Weight for TermWeight {
|
||||
) -> crate::Result<()> {
|
||||
match self.specialized_scorer(reader, 1.0)? {
|
||||
TermOrEmptyOrAllScorer::TermScorer(mut term_scorer) => {
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
for_each_docset_buffered(&mut term_scorer, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut term_scorer, callback);
|
||||
}
|
||||
TermOrEmptyOrAllScorer::Empty => {}
|
||||
TermOrEmptyOrAllScorer::AllMatch(mut all_scorer) => {
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
for_each_docset_buffered(&mut all_scorer, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut all_scorer, callback);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -17,18 +17,56 @@ pub(crate) fn for_each_scorer<TScorer: Scorer + ?Sized>(
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates through all of the documents matched by the DocSet
|
||||
/// `DocSet`.
|
||||
/// Number of `COLLECT_BLOCK_BUFFER_LEN`-sized windows accumulated into the large
|
||||
/// buffer before it is flushed to the collector via `collect_block`.
|
||||
const NUM_WINDOWS_PER_BLOCK: usize = 32;
|
||||
/// Size of the buffer accumulated before invoking the callback (2_048 = 32 * 64).
|
||||
/// `fill_buffer` keeps writing `COLLECT_BLOCK_BUFFER_LEN`-sized windows; this only
|
||||
/// changes how much we accumulate before flushing.
|
||||
const LARGE_COLLECT_BUFFER_LEN: usize = COLLECT_BLOCK_BUFFER_LEN * NUM_WINDOWS_PER_BLOCK;
|
||||
|
||||
/// Iterates through all of the documents matched by the `DocSet`, flushing
|
||||
/// blocks of up to `LARGE_COLLECT_BUFFER_LEN` doc ids to `callback`.
|
||||
///
|
||||
/// `fill_buffer` only ever writes `COLLECT_BLOCK_BUFFER_LEN` doc ids at a time,
|
||||
/// so we accumulate several such windows into a single larger buffer before
|
||||
/// handing it to the collector. This amortizes the per-`collect_block` overhead
|
||||
/// (virtual dispatch, aggregation setup) over more documents.
|
||||
#[inline]
|
||||
pub(crate) fn for_each_docset_buffered<T: DocSet + ?Sized>(
|
||||
docset: &mut T,
|
||||
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
mut callback: impl FnMut(&[DocId]),
|
||||
) {
|
||||
// Heap-allocated once per call (i.e. once per segment in the no-score path).
|
||||
// `new_zeroed_slice` zeroes directly on the heap, avoiding a 2_048-element
|
||||
// stack temporary.
|
||||
// SAFETY: an all-zero bit pattern is a valid value for every `DocId` (u32),
|
||||
// so the zeroed slice is fully initialized.
|
||||
let mut buffer: Box<[DocId]> =
|
||||
unsafe { Box::new_zeroed_slice(LARGE_COLLECT_BUFFER_LEN).assume_init() };
|
||||
loop {
|
||||
let num_items = docset.fill_buffer(buffer);
|
||||
callback(&buffer[..num_items]);
|
||||
if num_items != buffer.len() {
|
||||
let mut filled = 0;
|
||||
let mut reached_end = false;
|
||||
// Fill the large buffer one `COLLECT_BLOCK_BUFFER_LEN` window at a time.
|
||||
// `chunks_exact_mut` yields windows of exactly `COLLECT_BLOCK_BUFFER_LEN`
|
||||
// because `LARGE_COLLECT_BUFFER_LEN` is a multiple of it (empty remainder).
|
||||
// The windows are contiguous and filled in order, so the doc ids always
|
||||
// occupy the contiguous prefix `buffer[..filled]`.
|
||||
for window in buffer.chunks_exact_mut(COLLECT_BLOCK_BUFFER_LEN) {
|
||||
// SAFETY: each `window` is a slice of exactly `COLLECT_BLOCK_BUFFER_LEN`
|
||||
// elements, so reinterpreting its start pointer as a fixed-size array
|
||||
// reference of that length is valid.
|
||||
let window: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN] =
|
||||
unsafe { &mut *window.as_mut_ptr().cast::<[DocId; COLLECT_BLOCK_BUFFER_LEN]>() };
|
||||
let num_items = docset.fill_buffer(window);
|
||||
filled += num_items;
|
||||
if num_items != COLLECT_BLOCK_BUFFER_LEN {
|
||||
reached_end = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
callback(&buffer[..filled]);
|
||||
if reached_end {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -104,9 +142,7 @@ pub trait Weight: Send + Sync + 'static {
|
||||
callback: &mut dyn FnMut(&[DocId]),
|
||||
) -> crate::Result<()> {
|
||||
let mut docset = self.scorer(reader, 1.0)?;
|
||||
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
for_each_docset_buffered(&mut docset, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut docset, callback);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -94,13 +94,7 @@ impl SkipIndex {
|
||||
byte_range: 0..first_layer_len,
|
||||
};
|
||||
for layer in &self.layers {
|
||||
if let Some(checkpoint) =
|
||||
layer.seek_start_at_offset(target, cur_checkpoint.byte_range.start)
|
||||
{
|
||||
cur_checkpoint = checkpoint;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
cur_checkpoint = layer.seek_start_at_offset(target, cur_checkpoint.byte_range.start)?;
|
||||
}
|
||||
Some(cur_checkpoint)
|
||||
}
|
||||
|
||||
@@ -14,11 +14,8 @@ use itertools::Itertools;
|
||||
use tantivy_fst::Automaton;
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
|
||||
use crate::sstable_index_v3::SSTableIndexV3Empty;
|
||||
use crate::streamer::{Streamer, StreamerBuilder};
|
||||
use crate::{
|
||||
BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable,
|
||||
};
|
||||
use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, VoidSSTable};
|
||||
|
||||
/// An SSTable is a sorted map that associates sorted `&[u8]` keys
|
||||
/// to any kind of typed values.
|
||||
@@ -288,33 +285,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
|
||||
let sstable_index_bytes = index_slice.read_bytes()?;
|
||||
|
||||
let sstable_index = match version {
|
||||
2 => SSTableIndex::V2(
|
||||
crate::sstable_index_v2::SSTableIndex::load(sstable_index_bytes).map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
|
||||
})?,
|
||||
),
|
||||
3 => {
|
||||
let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8);
|
||||
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
|
||||
if store_offset != 0 {
|
||||
SSTableIndex::V3(
|
||||
SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
|
||||
})?,
|
||||
)
|
||||
} else {
|
||||
// if store_offset is zero, there is no index, so we build a pseudo-index
|
||||
// assuming a single block of sstable covering everything.
|
||||
SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(io::Error::other(format!(
|
||||
"Unsupported sstable version, expected one of [2, 3], found {version}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let sstable_index = SSTableIndex::open(version, index_offset, sstable_index_bytes)?;
|
||||
|
||||
Ok(Dictionary {
|
||||
sstable_slice,
|
||||
@@ -525,10 +496,15 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
|
||||
// Open the block for the first ordinal.
|
||||
let mut bytes = Vec::new();
|
||||
let mut current_block_addr = self.sstable_index.get_block_with_ord(ord);
|
||||
let (mut current_block_addr, block_id) = self.sstable_index.get_and_locate_with_ord(ord);
|
||||
let mut current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
let mut current_block_ordinal = current_block_addr.first_ordinal;
|
||||
let mut current_block_end_bound = self
|
||||
.sstable_index
|
||||
.get_block(block_id + 1)
|
||||
.map(|block_addr| block_addr.first_ordinal)
|
||||
.unwrap_or(u64::MAX);
|
||||
|
||||
loop {
|
||||
// move to the ord inside the current block
|
||||
@@ -557,17 +533,19 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
}
|
||||
};
|
||||
|
||||
// TODO optimization: it is silly to do a binary search to get the block every single
|
||||
// time.
|
||||
//
|
||||
// Check if block changed for new term_ord
|
||||
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
|
||||
if new_block_addr != current_block_addr {
|
||||
if next_ord >= current_block_end_bound {
|
||||
let (new_block_addr, block_id) =
|
||||
self.sstable_index.get_and_locate_with_ord(next_ord);
|
||||
current_block_addr = new_block_addr;
|
||||
current_block_ordinal = current_block_addr.first_ordinal;
|
||||
current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
bytes.clear();
|
||||
current_block_end_bound = self
|
||||
.sstable_index
|
||||
.get_block(block_id + 1)
|
||||
.map(|block_addr| block_addr.first_ordinal)
|
||||
.unwrap_or(u64::MAX)
|
||||
}
|
||||
ord = next_ord;
|
||||
}
|
||||
|
||||
319
sstable/src/index/mod.rs
Normal file
319
sstable/src/index/mod.rs
Normal file
@@ -0,0 +1,319 @@
|
||||
pub(crate) mod v2;
|
||||
pub(crate) mod v3;
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Range;
|
||||
|
||||
use common::{BinarySerializable, FixedSize, OwnedBytes};
|
||||
use tantivy_fst::{Automaton, MapBuilder};
|
||||
|
||||
use crate::{TermOrdinal, common_prefix_len};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SSTableIndex {
|
||||
V2(v2::SSTableIndex),
|
||||
V3(v3::SSTableIndexV3),
|
||||
V3Empty(v3::SSTableIndexV3Empty),
|
||||
}
|
||||
|
||||
impl SSTableIndex {
|
||||
pub(crate) fn open(
|
||||
version: u32,
|
||||
index_offset: u64,
|
||||
index_bytes: OwnedBytes,
|
||||
) -> io::Result<Self> {
|
||||
let index = match version {
|
||||
2 => {
|
||||
SSTableIndex::V2(v2::SSTableIndex::load(index_bytes).map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
|
||||
})?)
|
||||
}
|
||||
3 => {
|
||||
let (index_bytes, mut footerv3_len_bytes) = index_bytes.rsplit(8);
|
||||
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
|
||||
if store_offset != 0 {
|
||||
SSTableIndex::V3(v3::SSTableIndexV3::load(index_bytes, store_offset).map_err(
|
||||
|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"),
|
||||
)?)
|
||||
} else {
|
||||
// if store_offset is zero, there is no index, so we build a pseudo-index
|
||||
// assuming a single block of sstable covering everything.
|
||||
SSTableIndex::V3Empty(v3::SSTableIndexV3Empty::load(index_offset as usize))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(io::Error::other(format!(
|
||||
"Unsupported sstable version, expected one of [2, 3], found {version}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the requested block.
|
||||
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the block id of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
|
||||
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
|
||||
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_and_locate_with_ord(ord),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_and_locate_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_and_locate_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_block_for_automaton<'a>(
|
||||
&'a self,
|
||||
automaton: &'a impl Automaton,
|
||||
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => {
|
||||
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
|
||||
}
|
||||
SSTableIndex::V3(v3_index) => {
|
||||
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
|
||||
}
|
||||
SSTableIndex::V3Empty(v3_empty) => {
|
||||
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum BlockIter<V2, V3, T> {
|
||||
V2(V2),
|
||||
V3(V3),
|
||||
V3Empty(std::iter::Once<T>),
|
||||
}
|
||||
|
||||
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self {
|
||||
BlockIter::V2(v2) => v2.next(),
|
||||
BlockIter::V3(v3) => v3.next(),
|
||||
BlockIter::V3Empty(once) => once.next(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub struct BlockAddr {
|
||||
pub first_ordinal: u64,
|
||||
pub byte_range: Range<usize>,
|
||||
}
|
||||
|
||||
impl BlockAddr {
|
||||
fn to_block_start(&self) -> BlockStartAddr {
|
||||
BlockStartAddr {
|
||||
first_ordinal: self.first_ordinal,
|
||||
byte_range_start: self.byte_range.start,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct BlockStartAddr {
|
||||
first_ordinal: u64,
|
||||
byte_range_start: usize,
|
||||
}
|
||||
|
||||
impl BlockStartAddr {
|
||||
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
|
||||
BlockAddr {
|
||||
first_ordinal: self.first_ordinal,
|
||||
byte_range: self.byte_range_start..byte_range_end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct BlockMeta {
|
||||
/// Any byte string that is lexicographically greater or equal to
|
||||
/// the last key in the block,
|
||||
/// and yet strictly smaller than the first key in the next block.
|
||||
pub last_key_or_greater: Vec<u8>,
|
||||
pub block_addr: BlockAddr,
|
||||
}
|
||||
|
||||
impl BinarySerializable for BlockStartAddr {
|
||||
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
|
||||
let start = self.byte_range_start as u64;
|
||||
start.serialize(writer)?;
|
||||
self.first_ordinal.serialize(writer)
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
|
||||
let byte_range_start = u64::deserialize(reader)? as usize;
|
||||
let first_ordinal = u64::deserialize(reader)?;
|
||||
Ok(BlockStartAddr {
|
||||
first_ordinal,
|
||||
byte_range_start,
|
||||
})
|
||||
}
|
||||
|
||||
// Provided method
|
||||
fn num_bytes(&self) -> u64 {
|
||||
BlockStartAddr::SIZE_IN_BYTES as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedSize for BlockStartAddr {
|
||||
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
|
||||
}
|
||||
|
||||
/// Given that left < right,
|
||||
/// mutates `left into a shorter byte string left'` that
|
||||
/// matches `left <= left' < right`.
|
||||
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
|
||||
assert!(&left[..] < right);
|
||||
let common_len = common_prefix_len(left, right);
|
||||
if left.len() == common_len {
|
||||
return;
|
||||
}
|
||||
// It is possible to do one character shorter in some case,
|
||||
// but it is not worth the extra complexity
|
||||
for pos in (common_len + 1)..left.len() {
|
||||
if left[pos] != u8::MAX {
|
||||
left[pos] += 1;
|
||||
left.truncate(pos + 1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SSTableIndexBuilder {
|
||||
blocks: Vec<BlockMeta>,
|
||||
}
|
||||
|
||||
impl SSTableIndexBuilder {
|
||||
/// In order to make the index as light as possible, we
|
||||
/// try to find a shorter alternative to the last key of the last block
|
||||
/// that is still smaller than the next key.
|
||||
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
|
||||
if let Some(last_block) = self.blocks.last_mut() {
|
||||
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
|
||||
self.blocks.push(BlockMeta {
|
||||
last_key_or_greater: last_key.to_vec(),
|
||||
block_addr: BlockAddr {
|
||||
byte_range,
|
||||
first_ordinal,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
|
||||
if self.blocks.len() <= 1 {
|
||||
return Ok(0);
|
||||
}
|
||||
let counting_writer = common::CountingWriter::wrap(wrt);
|
||||
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
|
||||
for (i, block) in self.blocks.iter().enumerate() {
|
||||
map_builder
|
||||
.insert(&block.last_key_or_greater, i as u64)
|
||||
.map_err(fst_error_to_io_error)?;
|
||||
}
|
||||
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
|
||||
let written_bytes = counting_writer.written_bytes();
|
||||
let mut wrt = counting_writer.finish();
|
||||
|
||||
let mut block_store_writer = v3::BlockAddrStoreWriter::new();
|
||||
for block in &self.blocks {
|
||||
block_store_writer.write_block_meta(block.block_addr.clone())?;
|
||||
}
|
||||
block_store_writer.serialize(&mut wrt)?;
|
||||
|
||||
Ok(written_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
|
||||
match error {
|
||||
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
|
||||
tantivy_fst::Error::Io(ioerror) => ioerror,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[track_caller]
|
||||
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
|
||||
let mut left_buf = left.to_vec();
|
||||
super::find_shorter_str_in_between(&mut left_buf, right);
|
||||
assert!(left_buf.len() <= left.len());
|
||||
assert!(left <= &left_buf);
|
||||
assert!(&left_buf[..] < right);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_shorter_str_in_between() {
|
||||
test_find_shorter_str_in_between_aux(b"", b"hello");
|
||||
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
|
||||
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
|
||||
}
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(100))]
|
||||
#[test]
|
||||
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
|
||||
if left < right {
|
||||
test_find_shorter_str_in_between_aux(&left, &right);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -77,6 +77,13 @@ impl SSTableIndex {
|
||||
self.get_block(self.locate_with_ord(ord)).unwrap()
|
||||
}
|
||||
|
||||
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
let location = self.locate_with_ord(ord);
|
||||
// locate_with_ord always returns an index within range
|
||||
let block_addr = self.get_block(location).unwrap();
|
||||
(block_addr, location as u64)
|
||||
}
|
||||
|
||||
pub(crate) fn get_block_for_automaton<'a>(
|
||||
&'a self,
|
||||
automaton: &'a impl Automaton,
|
||||
@@ -1,106 +1,14 @@
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{BinarySerializable, FixedSize, OwnedBytes};
|
||||
use tantivy_bitpacker::{BitPacker, compute_num_bits};
|
||||
use tantivy_fst::raw::Fst;
|
||||
use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer};
|
||||
use tantivy_fst::{Automaton, IntoStreamer, Map, Streamer};
|
||||
|
||||
use super::{BlockAddr, BlockStartAddr};
|
||||
use crate::block_match_automaton::can_block_match_automaton;
|
||||
use crate::{SSTableDataCorruption, TermOrdinal, common_prefix_len};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SSTableIndex {
|
||||
V2(crate::sstable_index_v2::SSTableIndex),
|
||||
V3(SSTableIndexV3),
|
||||
V3Empty(SSTableIndexV3Empty),
|
||||
}
|
||||
|
||||
impl SSTableIndex {
|
||||
/// Get the [`BlockAddr`] of the requested block.
|
||||
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the block id of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
|
||||
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
|
||||
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_block_for_automaton<'a>(
|
||||
&'a self,
|
||||
automaton: &'a impl Automaton,
|
||||
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => {
|
||||
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
|
||||
}
|
||||
SSTableIndex::V3(v3_index) => {
|
||||
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
|
||||
}
|
||||
SSTableIndex::V3Empty(v3_empty) => {
|
||||
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum BlockIter<V2, V3, T> {
|
||||
V2(V2),
|
||||
V3(V3),
|
||||
V3Empty(std::iter::Once<T>),
|
||||
}
|
||||
|
||||
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self {
|
||||
BlockIter::V2(v2) => v2.next(),
|
||||
BlockIter::V3(v3) => v3.next(),
|
||||
BlockIter::V3Empty(once) => once.next(),
|
||||
}
|
||||
}
|
||||
}
|
||||
use crate::{SSTableDataCorruption, TermOrdinal};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SSTableIndexV3 {
|
||||
@@ -160,6 +68,11 @@ impl SSTableIndexV3 {
|
||||
self.block_addr_store.binary_search_ord(ord).1
|
||||
}
|
||||
|
||||
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
let (location, block_addr) = self.block_addr_store.binary_search_ord(ord);
|
||||
(block_addr, location)
|
||||
}
|
||||
|
||||
pub(crate) fn get_block_for_automaton<'a>(
|
||||
&'a self,
|
||||
automaton: &'a impl Automaton,
|
||||
@@ -216,7 +129,7 @@ impl<A: Automaton> Iterator for GetBlockForAutomaton<'_, A> {
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SSTableIndexV3Empty {
|
||||
block_addr: BlockAddr,
|
||||
pub block_addr: BlockAddr,
|
||||
}
|
||||
|
||||
impl SSTableIndexV3Empty {
|
||||
@@ -230,8 +143,8 @@ impl SSTableIndexV3Empty {
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the requested block.
|
||||
pub(crate) fn get_block(&self, _block_id: u64) -> Option<BlockAddr> {
|
||||
Some(self.block_addr.clone())
|
||||
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
|
||||
(block_id == 0).then(|| self.block_addr.clone())
|
||||
}
|
||||
|
||||
/// Get the block id of the block that would contain `key`.
|
||||
@@ -256,146 +169,9 @@ impl SSTableIndexV3Empty {
|
||||
pub(crate) fn get_block_with_ord(&self, _ord: TermOrdinal) -> BlockAddr {
|
||||
self.block_addr.clone()
|
||||
}
|
||||
}
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub struct BlockAddr {
|
||||
pub first_ordinal: u64,
|
||||
pub byte_range: Range<usize>,
|
||||
}
|
||||
|
||||
impl BlockAddr {
|
||||
fn to_block_start(&self) -> BlockStartAddr {
|
||||
BlockStartAddr {
|
||||
first_ordinal: self.first_ordinal,
|
||||
byte_range_start: self.byte_range.start,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct BlockStartAddr {
|
||||
first_ordinal: u64,
|
||||
byte_range_start: usize,
|
||||
}
|
||||
|
||||
impl BlockStartAddr {
|
||||
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
|
||||
BlockAddr {
|
||||
first_ordinal: self.first_ordinal,
|
||||
byte_range: self.byte_range_start..byte_range_end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct BlockMeta {
|
||||
/// Any byte string that is lexicographically greater or equal to
|
||||
/// the last key in the block,
|
||||
/// and yet strictly smaller than the first key in the next block.
|
||||
pub last_key_or_greater: Vec<u8>,
|
||||
pub block_addr: BlockAddr,
|
||||
}
|
||||
|
||||
impl BinarySerializable for BlockStartAddr {
|
||||
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
|
||||
let start = self.byte_range_start as u64;
|
||||
start.serialize(writer)?;
|
||||
self.first_ordinal.serialize(writer)
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
|
||||
let byte_range_start = u64::deserialize(reader)? as usize;
|
||||
let first_ordinal = u64::deserialize(reader)?;
|
||||
Ok(BlockStartAddr {
|
||||
first_ordinal,
|
||||
byte_range_start,
|
||||
})
|
||||
}
|
||||
|
||||
// Provided method
|
||||
fn num_bytes(&self) -> u64 {
|
||||
BlockStartAddr::SIZE_IN_BYTES as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedSize for BlockStartAddr {
|
||||
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
|
||||
}
|
||||
|
||||
/// Given that left < right,
|
||||
/// mutates `left into a shorter byte string left'` that
|
||||
/// matches `left <= left' < right`.
|
||||
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
|
||||
assert!(&left[..] < right);
|
||||
let common_len = common_prefix_len(left, right);
|
||||
if left.len() == common_len {
|
||||
return;
|
||||
}
|
||||
// It is possible to do one character shorter in some case,
|
||||
// but it is not worth the extra complexity
|
||||
for pos in (common_len + 1)..left.len() {
|
||||
if left[pos] != u8::MAX {
|
||||
left[pos] += 1;
|
||||
left.truncate(pos + 1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SSTableIndexBuilder {
|
||||
blocks: Vec<BlockMeta>,
|
||||
}
|
||||
|
||||
impl SSTableIndexBuilder {
|
||||
/// In order to make the index as light as possible, we
|
||||
/// try to find a shorter alternative to the last key of the last block
|
||||
/// that is still smaller than the next key.
|
||||
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
|
||||
if let Some(last_block) = self.blocks.last_mut() {
|
||||
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
|
||||
self.blocks.push(BlockMeta {
|
||||
last_key_or_greater: last_key.to_vec(),
|
||||
block_addr: BlockAddr {
|
||||
byte_range,
|
||||
first_ordinal,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
|
||||
if self.blocks.len() <= 1 {
|
||||
return Ok(0);
|
||||
}
|
||||
let counting_writer = common::CountingWriter::wrap(wrt);
|
||||
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
|
||||
for (i, block) in self.blocks.iter().enumerate() {
|
||||
map_builder
|
||||
.insert(&block.last_key_or_greater, i as u64)
|
||||
.map_err(fst_error_to_io_error)?;
|
||||
}
|
||||
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
|
||||
let written_bytes = counting_writer.written_bytes();
|
||||
let mut wrt = counting_writer.finish();
|
||||
|
||||
let mut block_store_writer = BlockAddrStoreWriter::new();
|
||||
for block in &self.blocks {
|
||||
block_store_writer.write_block_meta(block.block_addr.clone())?;
|
||||
}
|
||||
block_store_writer.serialize(&mut wrt)?;
|
||||
|
||||
Ok(written_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
|
||||
match error {
|
||||
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
|
||||
tantivy_fst::Error::Io(ioerror) => ioerror,
|
||||
pub(crate) fn get_and_locate_with_ord(&self, _ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
(self.block_addr.clone(), 0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -647,14 +423,14 @@ fn binary_search(max: u64, cmp_fn: impl Fn(u64) -> std::cmp::Ordering) -> Result
|
||||
Err(left)
|
||||
}
|
||||
|
||||
struct BlockAddrStoreWriter {
|
||||
pub(crate) struct BlockAddrStoreWriter {
|
||||
buffer_block_metas: Vec<u8>,
|
||||
buffer_addrs: Vec<u8>,
|
||||
block_addrs: Vec<BlockAddr>,
|
||||
}
|
||||
|
||||
impl BlockAddrStoreWriter {
|
||||
fn new() -> Self {
|
||||
pub(crate) fn new() -> Self {
|
||||
BlockAddrStoreWriter {
|
||||
buffer_block_metas: Vec::new(),
|
||||
buffer_addrs: Vec::new(),
|
||||
@@ -662,7 +438,7 @@ impl BlockAddrStoreWriter {
|
||||
}
|
||||
}
|
||||
|
||||
fn flush_block(&mut self) -> io::Result<()> {
|
||||
pub(crate) fn flush_block(&mut self) -> io::Result<()> {
|
||||
if self.block_addrs.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -741,7 +517,7 @@ impl BlockAddrStoreWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
|
||||
pub(crate) fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
|
||||
self.block_addrs.push(block_addr);
|
||||
if self.block_addrs.len() >= STORE_BLOCK_LEN {
|
||||
self.flush_block()?;
|
||||
@@ -749,7 +525,7 @@ impl BlockAddrStoreWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
|
||||
pub(crate) fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
|
||||
self.flush_block()?;
|
||||
let len = self.buffer_block_metas.len() as u64;
|
||||
len.serialize(wrt)?;
|
||||
@@ -824,8 +600,9 @@ mod tests {
|
||||
use common::OwnedBytes;
|
||||
|
||||
use super::*;
|
||||
use crate::SSTableDataCorruption;
|
||||
use crate::block_match_automaton::tests::EqBuffer;
|
||||
use crate::index::BlockMeta;
|
||||
use crate::{SSTableDataCorruption, SSTableIndexBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_sstable_index() {
|
||||
@@ -874,36 +651,7 @@ mod tests {
|
||||
assert!(matches!(data_corruption_err, SSTableDataCorruption));
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
|
||||
let mut left_buf = left.to_vec();
|
||||
super::find_shorter_str_in_between(&mut left_buf, right);
|
||||
assert!(left_buf.len() <= left.len());
|
||||
assert!(left <= &left_buf);
|
||||
assert!(&left_buf[..] < right);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_shorter_str_in_between() {
|
||||
test_find_shorter_str_in_between_aux(b"", b"hello");
|
||||
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
|
||||
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
|
||||
}
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(100))]
|
||||
#[test]
|
||||
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
|
||||
if left < right {
|
||||
test_find_shorter_str_in_between_aux(&left, &right);
|
||||
}
|
||||
}
|
||||
}
|
||||
// use proptest::prelude::*;
|
||||
|
||||
#[test]
|
||||
fn test_find_best_slop() {
|
||||
@@ -47,9 +47,8 @@ pub mod merge;
|
||||
mod streamer;
|
||||
pub mod value;
|
||||
|
||||
mod sstable_index_v3;
|
||||
pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
|
||||
mod sstable_index_v2;
|
||||
mod index;
|
||||
pub use index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
|
||||
pub(crate) mod vint;
|
||||
pub use dictionary::{Dictionary, TermOrdHit};
|
||||
pub use streamer::{Streamer, StreamerBuilder};
|
||||
|
||||
@@ -27,7 +27,7 @@ rand = "0.9"
|
||||
zipf = "7.0.0"
|
||||
rustc-hash = "2.1.0"
|
||||
proptest = "1.2.0"
|
||||
binggan = { version = "0.16.1" }
|
||||
binggan = { version = "0.17.0" }
|
||||
rand_distr = "0.5"
|
||||
|
||||
[features]
|
||||
|
||||
Reference in New Issue
Block a user