Compare commits

...

14 Commits

Author SHA1 Message Date
Paul Masurel
46b3fb9ed3 Relying on upstream version of datasketch and stop using HLL 4. (#2936)
We were relying on a fork for:

a bugfix in LIST serialization
a better API exposing a new Coupon type, required for caching coupons.
We also stop using HLL8 in hope to fix
https://datadoghq.atlassian.net/browse/CLOUDPREM-625

Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-05-19 13:29:35 +02:00
trinity-1686a
fbe620b9b4 Merge pull request #2933 from quickwit-oss/1686a/sstable-opt
optimise sstable index access pattern
2026-05-19 11:43:17 +02:00
trinity-1686a
95d8a3989a cr 2026-05-19 11:38:48 +02:00
trinity-1686a
ea61a68db4 skip sstable index binary search when ordinal is in same block 2026-05-16 11:35:38 +02:00
trinity-1686a
c367df37c1 refactor sstable index 2026-05-16 11:30:02 +02:00
Mohammad Dashti
d99a5d4e91 Rename validate_aggregation_fields to validate_aggregation_fields_exist
Applies @PSeitz's review suggestion to make the function name more
descriptive of what it checks. Also adds a doc note clarifying why
validation is opt-in rather than enforced by default.
2026-05-16 15:45:20 +08:00
Mohammad Dashti
2de6f075ce Fixed the example 2026-05-16 15:45:20 +08:00
Mohammad Dashti
18080067c7 Applied PR comment:
I would move it outside of the aggregation. You can fetch the fields from the aggregation request and do a validation in a helper function
2026-05-16 15:45:20 +08:00
Mohammad Dashti
95db7d2e5c Revert "Revert all impl."
This reverts commit d5e0991549a05bf80f19f853f7689ad69f96e7e5.
2026-05-16 15:45:20 +08:00
Mohammad Dashti
fc017c4c74 Applied PR comments. 2026-05-16 15:45:20 +08:00
Mohammad Dashti
141c91d028 Added a flag: strict_validation 2026-05-16 15:45:20 +08:00
Mohammad Dashti
36a83e7c1a Fixed agg validation 2026-05-16 15:45:20 +08:00
jinhelin
be11f8a6a1 Fix opening positions file error 2026-05-14 15:55:59 +08:00
dependabot[bot]
4305e4029e Update binggan requirement from 0.16.1 to 0.17.0
Updates the requirements on [binggan](https://github.com/pseitz/binggan) to permit the latest version.
- [Changelog](https://github.com/PSeitz/binggan/blob/main/CHANGELOG.md)
- [Commits](https://github.com/pseitz/binggan/commits)

---
updated-dependencies:
- dependency-name: binggan
  dependency-version: 0.17.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-12 15:10:20 +08:00
13 changed files with 491 additions and 329 deletions

View File

@@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "7635fb8" }
datasketches = { version = "0.3.0", features = ["hll"] }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"
@@ -75,7 +75,7 @@ typetag = "0.2.21"
winapi = "0.3.9"
[dev-dependencies]
binggan = "0.16.1"
binggan = "0.17.0"
rand = "0.9"
maplit = "1.0.2"
matches = "0.1.9"

View File

@@ -23,7 +23,7 @@ downcast-rs = "2.0.1"
proptest = "1"
more-asserts = "0.3.1"
rand = "0.9"
binggan = "0.16.1"
binggan = "0.17.0"
[[bench]]
name = "bench_merge"

View File

@@ -19,6 +19,6 @@ time = { version = "0.3.47", features = ["serde-well-known"] }
serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
binggan = "0.16.1"
binggan = "0.17.0"
proptest = "1.0.0"
rand = "0.9"

View File

@@ -115,6 +115,71 @@ pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet<String> {
fast_field_names
}
/// Validates that all fields referenced in the aggregation request exist in the schema
/// and are configured as fast fields.
///
/// This is a convenience function for upfront validation before executing aggregations.
/// Returns an error if any field doesn't exist or is not a fast field.
///
/// Validation is intentionally opt-in rather than baked into aggregation execution: the
/// default lenient behavior (returning empty results for missing fields) supports
/// schema evolution and federated queries where the same request runs against segments
/// or indices with different schemas.
///
/// # Example
/// ```
/// use tantivy::aggregation::agg_req::{Aggregations, validate_aggregation_fields_exist};
/// use tantivy::schema::{Schema, FAST};
/// use tantivy::Index;
///
/// # fn main() -> tantivy::Result<()> {
/// // Create a simple index
/// let mut schema_builder = Schema::builder();
/// schema_builder.add_f64_field("price", FAST);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// // Parse aggregation request
/// let agg_req: Aggregations = serde_json::from_str(r#"{
/// "avg_price": { "avg": { "field": "price" } }
/// }"#)?;
///
/// let reader = index.reader()?;
/// let searcher = reader.searcher();
///
/// // Validate fields before executing
/// for segment_reader in searcher.segment_readers() {
/// validate_aggregation_fields_exist(&agg_req, segment_reader)?;
/// }
/// # Ok(())
/// # }
/// ```
pub fn validate_aggregation_fields_exist(
aggs: &Aggregations,
reader: &crate::SegmentReader,
) -> crate::Result<()> {
let field_names = get_fast_field_names(aggs);
let schema = reader.schema();
for field_name in field_names {
// Check if the field is either directly in the schema or could be part of a json field
// present in the schema, and verify it's a fast field.
if let Some((field, _path)) = schema.find_field(&field_name) {
let field_type = schema.get_field_entry(field).field_type();
if !field_type.is_fast() {
return Err(crate::TantivyError::SchemaError(format!(
"Field '{}' is not a fast field. Aggregations require fast fields.",
field_name
)));
}
} else {
return Err(crate::TantivyError::FieldNotFound(field_name));
}
}
Ok(())
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// All aggregation types.
pub enum AggregationVariants {

View File

@@ -1436,3 +1436,46 @@ fn test_aggregation_on_json_object_mixed_numerical_segments() {
)
);
}
#[test]
fn test_aggregation_field_validation_helper() {
// Test the standalone validation helper function for field validation
let index = get_test_index_2_segments(false).unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);
// Test with invalid field
let agg_req: Aggregations = serde_json::from_str(
r#"{
"avg_test": {
"avg": { "field": "nonexistent_field" }
}
}"#,
)
.unwrap();
let result =
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
assert!(result.is_err());
match result {
Err(crate::TantivyError::FieldNotFound(field_name)) => {
assert_eq!(field_name, "nonexistent_field");
}
_ => panic!("Expected FieldNotFound error, got: {:?}", result),
}
// Test with valid field
let agg_req: Aggregations = serde_json::from_str(
r#"{
"avg_test": {
"avg": { "field": "score" }
}
}"#,
)
.unwrap();
let result =
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
assert!(result.is_ok());
}

View File

@@ -166,8 +166,12 @@ impl CouponCache {
let should_use_dense =
highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64;
if should_use_dense {
let mut coupon_map: Vec<Coupon> = vec![Coupon::EMPTY; highest_term_ord as usize + 1];
for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) {
// We don't really care about the value here. We will populate all the values we will
// read anyway.
let uninitialized_coupon = Coupon::from_hash(0);
let mut coupon_map: Vec<Coupon> =
vec![uninitialized_coupon; highest_term_ord as usize + 1];
for (term_ord, coupon) in term_ords.into_iter().zip(coupons) {
coupon_map[term_ord as usize] = coupon;
}
CouponCache::Dense {
@@ -821,7 +825,7 @@ impl<'de> Deserialize<'de> for CardinalityCollector {
impl CardinalityCollector {
fn new(salt: u8) -> Self {
Self {
sketch: HllSketch::new(LG_K, HllType::Hll4),
sketch: HllSketch::new(LG_K, HllType::Hll8),
salt,
}
}
@@ -852,7 +856,7 @@ impl CardinalityCollector {
let mut union = HllUnion::new(LG_K);
union.update(&self.sketch);
union.update(&right.sketch);
self.sketch = union.to_sketch(HllType::Hll4);
self.sketch = union.to_sketch(HllType::Hll8);
Ok(())
}
}

View File

@@ -6,6 +6,7 @@ use common::{ByteCount, HasLen};
use fnv::FnvHashMap;
use itertools::Itertools;
use crate::directory::error::OpenReadError;
use crate::directory::{CompositeFile, FileSlice};
use crate::error::DataCorruption;
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
@@ -159,12 +160,10 @@ impl SegmentReader {
let postings_file = segment.open_read(SegmentComponent::Postings)?;
let postings_composite = CompositeFile::open(&postings_file)?;
let positions_composite = {
if let Ok(positions_file) = segment.open_read(SegmentComponent::Positions) {
CompositeFile::open(&positions_file)?
} else {
CompositeFile::empty()
}
let positions_composite = match segment.open_read(SegmentComponent::Positions) {
Ok(positions_file) => CompositeFile::open(&positions_file)?,
Err(OpenReadError::FileDoesNotExist(_)) => CompositeFile::empty(),
Err(open_read_error) => return Err(open_read_error.into()),
};
let schema = segment.schema();

View File

@@ -14,11 +14,8 @@ use itertools::Itertools;
use tantivy_fst::Automaton;
use tantivy_fst::automaton::AlwaysMatch;
use crate::sstable_index_v3::SSTableIndexV3Empty;
use crate::streamer::{Streamer, StreamerBuilder};
use crate::{
BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable,
};
use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, VoidSSTable};
/// An SSTable is a sorted map that associates sorted `&[u8]` keys
/// to any kind of typed values.
@@ -288,33 +285,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = match version {
2 => SSTableIndex::V2(
crate::sstable_index_v2::SSTableIndex::load(sstable_index_bytes).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
),
3 => {
let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8);
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
if store_offset != 0 {
SSTableIndex::V3(
SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
)
} else {
// if store_offset is zero, there is no index, so we build a pseudo-index
// assuming a single block of sstable covering everything.
SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize))
}
}
_ => {
return Err(io::Error::other(format!(
"Unsupported sstable version, expected one of [2, 3], found {version}"
)));
}
};
let sstable_index = SSTableIndex::open(version, index_offset, sstable_index_bytes)?;
Ok(Dictionary {
sstable_slice,
@@ -525,10 +496,15 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
// Open the block for the first ordinal.
let mut bytes = Vec::new();
let mut current_block_addr = self.sstable_index.get_block_with_ord(ord);
let (mut current_block_addr, block_id) = self.sstable_index.get_and_locate_with_ord(ord);
let mut current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
let mut current_block_ordinal = current_block_addr.first_ordinal;
let mut current_block_end_bound = self
.sstable_index
.get_block(block_id + 1)
.map(|block_addr| block_addr.first_ordinal)
.unwrap_or(u64::MAX);
loop {
// move to the ord inside the current block
@@ -557,17 +533,19 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
}
};
// TODO optimization: it is silly to do a binary search to get the block every single
// time.
//
// Check if block changed for new term_ord
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
if new_block_addr != current_block_addr {
if next_ord >= current_block_end_bound {
let (new_block_addr, block_id) =
self.sstable_index.get_and_locate_with_ord(next_ord);
current_block_addr = new_block_addr;
current_block_ordinal = current_block_addr.first_ordinal;
current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
bytes.clear();
current_block_end_bound = self
.sstable_index
.get_block(block_id + 1)
.map(|block_addr| block_addr.first_ordinal)
.unwrap_or(u64::MAX)
}
ord = next_ord;
}

319
sstable/src/index/mod.rs Normal file
View File

@@ -0,0 +1,319 @@
pub(crate) mod v2;
pub(crate) mod v3;
use std::io::{self, Read, Write};
use std::ops::Range;
use common::{BinarySerializable, FixedSize, OwnedBytes};
use tantivy_fst::{Automaton, MapBuilder};
use crate::{TermOrdinal, common_prefix_len};
#[derive(Debug, Clone)]
pub enum SSTableIndex {
V2(v2::SSTableIndex),
V3(v3::SSTableIndexV3),
V3Empty(v3::SSTableIndexV3Empty),
}
impl SSTableIndex {
pub(crate) fn open(
version: u32,
index_offset: u64,
index_bytes: OwnedBytes,
) -> io::Result<Self> {
let index = match version {
2 => {
SSTableIndex::V2(v2::SSTableIndex::load(index_bytes).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?)
}
3 => {
let (index_bytes, mut footerv3_len_bytes) = index_bytes.rsplit(8);
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
if store_offset != 0 {
SSTableIndex::V3(v3::SSTableIndexV3::load(index_bytes, store_offset).map_err(
|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"),
)?)
} else {
// if store_offset is zero, there is no index, so we build a pseudo-index
// assuming a single block of sstable covering everything.
SSTableIndex::V3Empty(v3::SSTableIndexV3Empty::load(index_offset as usize))
}
}
_ => {
return Err(io::Error::other(format!(
"Unsupported sstable version, expected one of [2, 3], found {version}"
)));
}
};
Ok(index)
}
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
}
}
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
}
}
/// Get the [`BlockAddr`] of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
}
}
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
}
}
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
}
}
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_and_locate_with_ord(ord),
SSTableIndex::V3(v3_index) => v3_index.get_and_locate_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_and_locate_with_ord(ord),
}
}
pub fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
match self {
SSTableIndex::V2(v2_index) => {
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3(v3_index) => {
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3Empty(v3_empty) => {
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
}
}
}
}
enum BlockIter<V2, V3, T> {
V2(V2),
V3(V3),
V3Empty(std::iter::Once<T>),
}
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::V2(v2) => v2.next(),
BlockIter::V3(v3) => v3.next(),
BlockIter::V3Empty(once) => once.next(),
}
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct BlockAddr {
pub first_ordinal: u64,
pub byte_range: Range<usize>,
}
impl BlockAddr {
fn to_block_start(&self) -> BlockStartAddr {
BlockStartAddr {
first_ordinal: self.first_ordinal,
byte_range_start: self.byte_range.start,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct BlockStartAddr {
first_ordinal: u64,
byte_range_start: usize,
}
impl BlockStartAddr {
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
BlockAddr {
first_ordinal: self.first_ordinal,
byte_range: self.byte_range_start..byte_range_end,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,
/// and yet strictly smaller than the first key in the next block.
pub last_key_or_greater: Vec<u8>,
pub block_addr: BlockAddr,
}
impl BinarySerializable for BlockStartAddr {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
let start = self.byte_range_start as u64;
start.serialize(writer)?;
self.first_ordinal.serialize(writer)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let byte_range_start = u64::deserialize(reader)? as usize;
let first_ordinal = u64::deserialize(reader)?;
Ok(BlockStartAddr {
first_ordinal,
byte_range_start,
})
}
// Provided method
fn num_bytes(&self) -> u64 {
BlockStartAddr::SIZE_IN_BYTES as u64
}
}
impl FixedSize for BlockStartAddr {
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
}
/// Given that left < right,
/// mutates `left into a shorter byte string left'` that
/// matches `left <= left' < right`.
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
assert!(&left[..] < right);
let common_len = common_prefix_len(left, right);
if left.len() == common_len {
return;
}
// It is possible to do one character shorter in some case,
// but it is not worth the extra complexity
for pos in (common_len + 1)..left.len() {
if left[pos] != u8::MAX {
left[pos] += 1;
left.truncate(pos + 1);
return;
}
}
}
#[derive(Default)]
pub struct SSTableIndexBuilder {
blocks: Vec<BlockMeta>,
}
impl SSTableIndexBuilder {
/// In order to make the index as light as possible, we
/// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
}
}
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.blocks.push(BlockMeta {
last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr {
byte_range,
first_ordinal,
},
})
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
if self.blocks.len() <= 1 {
return Ok(0);
}
let counting_writer = common::CountingWriter::wrap(wrt);
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
for (i, block) in self.blocks.iter().enumerate() {
map_builder
.insert(&block.last_key_or_greater, i as u64)
.map_err(fst_error_to_io_error)?;
}
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
let written_bytes = counting_writer.written_bytes();
let mut wrt = counting_writer.finish();
let mut block_store_writer = v3::BlockAddrStoreWriter::new();
for block in &self.blocks {
block_store_writer.write_block_meta(block.block_addr.clone())?;
}
block_store_writer.serialize(&mut wrt)?;
Ok(written_bytes)
}
}
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
match error {
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
tantivy_fst::Error::Io(ioerror) => ioerror,
}
}
#[cfg(test)]
mod tests {
#[track_caller]
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
let mut left_buf = left.to_vec();
super::find_shorter_str_in_between(&mut left_buf, right);
assert!(left_buf.len() <= left.len());
assert!(left <= &left_buf);
assert!(&left_buf[..] < right);
}
#[test]
fn test_find_shorter_str_in_between() {
test_find_shorter_str_in_between_aux(b"", b"hello");
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
if left < right {
test_find_shorter_str_in_between_aux(&left, &right);
}
}
}
}

View File

@@ -77,6 +77,13 @@ impl SSTableIndex {
self.get_block(self.locate_with_ord(ord)).unwrap()
}
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
let location = self.locate_with_ord(ord);
// locate_with_ord always returns an index within range
let block_addr = self.get_block(location).unwrap();
(block_addr, location as u64)
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,

View File

@@ -1,106 +1,14 @@
use std::io::{self, Read, Write};
use std::ops::Range;
use std::sync::Arc;
use common::{BinarySerializable, FixedSize, OwnedBytes};
use tantivy_bitpacker::{BitPacker, compute_num_bits};
use tantivy_fst::raw::Fst;
use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer};
use tantivy_fst::{Automaton, IntoStreamer, Map, Streamer};
use super::{BlockAddr, BlockStartAddr};
use crate::block_match_automaton::can_block_match_automaton;
use crate::{SSTableDataCorruption, TermOrdinal, common_prefix_len};
#[derive(Debug, Clone)]
pub enum SSTableIndex {
V2(crate::sstable_index_v2::SSTableIndex),
V3(SSTableIndexV3),
V3Empty(SSTableIndexV3Empty),
}
impl SSTableIndex {
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
}
}
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
}
}
/// Get the [`BlockAddr`] of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
}
}
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
}
}
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
}
}
pub fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
match self {
SSTableIndex::V2(v2_index) => {
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3(v3_index) => {
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3Empty(v3_empty) => {
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
}
}
}
}
enum BlockIter<V2, V3, T> {
V2(V2),
V3(V3),
V3Empty(std::iter::Once<T>),
}
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::V2(v2) => v2.next(),
BlockIter::V3(v3) => v3.next(),
BlockIter::V3Empty(once) => once.next(),
}
}
}
use crate::{SSTableDataCorruption, TermOrdinal};
#[derive(Debug, Clone)]
pub struct SSTableIndexV3 {
@@ -160,6 +68,11 @@ impl SSTableIndexV3 {
self.block_addr_store.binary_search_ord(ord).1
}
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
let (location, block_addr) = self.block_addr_store.binary_search_ord(ord);
(block_addr, location)
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
@@ -216,7 +129,7 @@ impl<A: Automaton> Iterator for GetBlockForAutomaton<'_, A> {
#[derive(Debug, Clone)]
pub struct SSTableIndexV3Empty {
block_addr: BlockAddr,
pub block_addr: BlockAddr,
}
impl SSTableIndexV3Empty {
@@ -230,8 +143,8 @@ impl SSTableIndexV3Empty {
}
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, _block_id: u64) -> Option<BlockAddr> {
Some(self.block_addr.clone())
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
(block_id == 0).then(|| self.block_addr.clone())
}
/// Get the block id of the block that would contain `key`.
@@ -256,146 +169,9 @@ impl SSTableIndexV3Empty {
pub(crate) fn get_block_with_ord(&self, _ord: TermOrdinal) -> BlockAddr {
self.block_addr.clone()
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct BlockAddr {
pub first_ordinal: u64,
pub byte_range: Range<usize>,
}
impl BlockAddr {
fn to_block_start(&self) -> BlockStartAddr {
BlockStartAddr {
first_ordinal: self.first_ordinal,
byte_range_start: self.byte_range.start,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct BlockStartAddr {
first_ordinal: u64,
byte_range_start: usize,
}
impl BlockStartAddr {
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
BlockAddr {
first_ordinal: self.first_ordinal,
byte_range: self.byte_range_start..byte_range_end,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,
/// and yet strictly smaller than the first key in the next block.
pub last_key_or_greater: Vec<u8>,
pub block_addr: BlockAddr,
}
impl BinarySerializable for BlockStartAddr {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
let start = self.byte_range_start as u64;
start.serialize(writer)?;
self.first_ordinal.serialize(writer)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let byte_range_start = u64::deserialize(reader)? as usize;
let first_ordinal = u64::deserialize(reader)?;
Ok(BlockStartAddr {
first_ordinal,
byte_range_start,
})
}
// Provided method
fn num_bytes(&self) -> u64 {
BlockStartAddr::SIZE_IN_BYTES as u64
}
}
impl FixedSize for BlockStartAddr {
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
}
/// Given that left < right,
/// mutates `left into a shorter byte string left'` that
/// matches `left <= left' < right`.
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
assert!(&left[..] < right);
let common_len = common_prefix_len(left, right);
if left.len() == common_len {
return;
}
// It is possible to do one character shorter in some case,
// but it is not worth the extra complexity
for pos in (common_len + 1)..left.len() {
if left[pos] != u8::MAX {
left[pos] += 1;
left.truncate(pos + 1);
return;
}
}
}
#[derive(Default)]
pub struct SSTableIndexBuilder {
blocks: Vec<BlockMeta>,
}
impl SSTableIndexBuilder {
/// In order to make the index as light as possible, we
/// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
}
}
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.blocks.push(BlockMeta {
last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr {
byte_range,
first_ordinal,
},
})
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
if self.blocks.len() <= 1 {
return Ok(0);
}
let counting_writer = common::CountingWriter::wrap(wrt);
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
for (i, block) in self.blocks.iter().enumerate() {
map_builder
.insert(&block.last_key_or_greater, i as u64)
.map_err(fst_error_to_io_error)?;
}
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
let written_bytes = counting_writer.written_bytes();
let mut wrt = counting_writer.finish();
let mut block_store_writer = BlockAddrStoreWriter::new();
for block in &self.blocks {
block_store_writer.write_block_meta(block.block_addr.clone())?;
}
block_store_writer.serialize(&mut wrt)?;
Ok(written_bytes)
}
}
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
match error {
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
tantivy_fst::Error::Io(ioerror) => ioerror,
pub(crate) fn get_and_locate_with_ord(&self, _ord: TermOrdinal) -> (BlockAddr, u64) {
(self.block_addr.clone(), 0)
}
}
@@ -647,14 +423,14 @@ fn binary_search(max: u64, cmp_fn: impl Fn(u64) -> std::cmp::Ordering) -> Result
Err(left)
}
struct BlockAddrStoreWriter {
pub(crate) struct BlockAddrStoreWriter {
buffer_block_metas: Vec<u8>,
buffer_addrs: Vec<u8>,
block_addrs: Vec<BlockAddr>,
}
impl BlockAddrStoreWriter {
fn new() -> Self {
pub(crate) fn new() -> Self {
BlockAddrStoreWriter {
buffer_block_metas: Vec::new(),
buffer_addrs: Vec::new(),
@@ -662,7 +438,7 @@ impl BlockAddrStoreWriter {
}
}
fn flush_block(&mut self) -> io::Result<()> {
pub(crate) fn flush_block(&mut self) -> io::Result<()> {
if self.block_addrs.is_empty() {
return Ok(());
}
@@ -741,7 +517,7 @@ impl BlockAddrStoreWriter {
Ok(())
}
fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
pub(crate) fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
self.block_addrs.push(block_addr);
if self.block_addrs.len() >= STORE_BLOCK_LEN {
self.flush_block()?;
@@ -749,7 +525,7 @@ impl BlockAddrStoreWriter {
Ok(())
}
fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
pub(crate) fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
self.flush_block()?;
let len = self.buffer_block_metas.len() as u64;
len.serialize(wrt)?;
@@ -824,8 +600,9 @@ mod tests {
use common::OwnedBytes;
use super::*;
use crate::SSTableDataCorruption;
use crate::block_match_automaton::tests::EqBuffer;
use crate::index::BlockMeta;
use crate::{SSTableDataCorruption, SSTableIndexBuilder};
#[test]
fn test_sstable_index() {
@@ -874,36 +651,7 @@ mod tests {
assert!(matches!(data_corruption_err, SSTableDataCorruption));
}
#[track_caller]
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
let mut left_buf = left.to_vec();
super::find_shorter_str_in_between(&mut left_buf, right);
assert!(left_buf.len() <= left.len());
assert!(left <= &left_buf);
assert!(&left_buf[..] < right);
}
#[test]
fn test_find_shorter_str_in_between() {
test_find_shorter_str_in_between_aux(b"", b"hello");
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
if left < right {
test_find_shorter_str_in_between_aux(&left, &right);
}
}
}
// use proptest::prelude::*;
#[test]
fn test_find_best_slop() {

View File

@@ -47,9 +47,8 @@ pub mod merge;
mod streamer;
pub mod value;
mod sstable_index_v3;
pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
mod sstable_index_v2;
mod index;
pub use index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
pub(crate) mod vint;
pub use dictionary::{Dictionary, TermOrdHit};
pub use streamer::{Streamer, StreamerBuilder};

View File

@@ -27,7 +27,7 @@ rand = "0.9"
zipf = "7.0.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.16.1" }
binggan = { version = "0.17.0" }
rand_distr = "0.5"
[features]