diff --git a/sstable/src/block_match_automaton.rs b/sstable/src/block_match_automaton.rs index 9a481a04e..3bb75b883 100644 --- a/sstable/src/block_match_automaton.rs +++ b/sstable/src/block_match_automaton.rs @@ -150,13 +150,13 @@ fn match_range_end>( } #[cfg(test)] -mod tests { +pub(crate) mod tests { use proptest::prelude::*; use tantivy_fst::Automaton; use super::*; - struct EqBuffer(Vec); + pub(crate) struct EqBuffer(pub Vec); impl Automaton for EqBuffer { type State = Option; @@ -185,7 +185,6 @@ mod tests { } proptest! { - #![proptest_config(ProptestConfig::with_cases(1_000_000_000))] #[test] fn test_proptest_automaton_match_block(start in any::>(), end in any::>(), key in any::>()) { // inverted keys are *not* supported and can return bogus results diff --git a/sstable/src/sstable_index_v2.rs b/sstable/src/sstable_index_v2.rs index d7c97c13a..a10731fc0 100644 --- a/sstable/src/sstable_index_v2.rs +++ b/sstable/src/sstable_index_v2.rs @@ -1,10 +1,12 @@ use common::OwnedBytes; +use tantivy_fst::Automaton; +use crate::block_match_automaton::block_match_automaton; use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal}; #[derive(Default, Debug, Clone)] pub struct SSTableIndex { - blocks: Vec, + pub(crate) blocks: Vec, } impl SSTableIndex { @@ -74,6 +76,27 @@ impl SSTableIndex { // locate_with_ord always returns an index within range self.get_block(self.locate_with_ord(ord)).unwrap() } + + pub(crate) fn get_block_for_automaton<'a>( + &'a self, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + std::iter::once((None, &self.blocks[0])) + .chain(self.blocks.windows(2).map(|window| { + let [prev, curr] = window else { + unreachable!(); + }; + (Some(&*prev.last_key_or_greater), curr) + })) + .enumerate() + .filter_map(move |(pos, (prev_key, current_block))| { + if block_match_automaton(prev_key, ¤t_block.last_key_or_greater, automaton) { + Some((pos, current_block.block_addr.clone())) + } else { + None + } + }) + } } #[derive(Debug, Clone)] @@ -99,3 +122,106 @@ impl SSTable for IndexSSTable { type ValueWriter = crate::value::index::IndexValueWriter; } + +#[cfg(test)] +mod tests { + use super::*; + use crate::block_match_automaton::tests::EqBuffer; + + #[test] + fn test_get_block_for_automaton() { + let sstable = SSTableIndex { + blocks: vec![ + BlockMeta { + last_key_or_greater: vec![0, 1, 2], + block_addr: BlockAddr { + first_ordinal: 0, + byte_range: 0..10, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 2, 2], + block_addr: BlockAddr { + first_ordinal: 5, + byte_range: 10..20, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 3, 2], + block_addr: BlockAddr { + first_ordinal: 10, + byte_range: 20..30, + }, + }, + ], + }; + + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 1, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 2, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 1, + BlockAddr { + first_ordinal: 5, + byte_range: 10..20 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 3, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 4, 1])) + .collect::>(); + assert!(res.is_empty()); + + let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1])); + let res = sstable + .get_block_for_automaton(&complex_automaton) + .collect::>(); + assert_eq!( + res, + vec![ + ( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + ), + ( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + ) + ] + ); + } +} diff --git a/sstable/src/sstable_index_v3.rs b/sstable/src/sstable_index_v3.rs index 8206ab242..a8f728bd5 100644 --- a/sstable/src/sstable_index_v3.rs +++ b/sstable/src/sstable_index_v3.rs @@ -5,8 +5,9 @@ use std::sync::Arc; use common::{BinarySerializable, FixedSize, OwnedBytes}; use tantivy_bitpacker::{compute_num_bits, BitPacker}; use tantivy_fst::raw::Fst; -use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer}; +use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer}; +use crate::block_match_automaton::block_match_automaton; use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; #[derive(Debug, Clone)] @@ -64,6 +65,41 @@ impl SSTableIndex { SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord), } } + + pub fn get_block_for_automaton<'a>( + &'a self, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + match self { + SSTableIndex::V2(v2_index) => { + BlockIter::V2(v2_index.get_block_for_automaton(automaton)) + } + SSTableIndex::V3(v3_index) => { + BlockIter::V3(v3_index.get_block_for_automaton(automaton)) + } + SSTableIndex::V3Empty(v3_empty) => { + BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone()))) + } + } + } +} + +enum BlockIter { + V2(V2), + V3(V3), + V3Empty(std::iter::Once), +} + +impl, V3: Iterator, T> Iterator for BlockIter { + type Item = T; + + fn next(&mut self) -> Option { + match self { + BlockIter::V2(v2) => v2.next(), + BlockIter::V3(v3) => v3.next(), + BlockIter::V3Empty(once) => once.next(), + } + } } #[derive(Debug, Clone)] @@ -123,6 +159,61 @@ impl SSTableIndexV3 { pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { self.block_addr_store.binary_search_ord(ord).1 } + + pub(crate) fn get_block_for_automaton<'a>( + &'a self, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + // this is more complicated than other index formats: we don't have a ready made list of + // blocks, and instead need to stream-decode the sstable. + + GetBlockForAutomaton { + streamer: self.fst_index.stream(), + block_addr_store: &self.block_addr_store, + prev_key: None, + automaton, + } + } +} + +struct GetBlockForAutomaton<'a, A: Automaton> { + streamer: tantivy_fst::map::Stream<'a>, + // TODO we could be more efficient by streaming the store + block_addr_store: &'a BlockAddrStore, + prev_key: Option>, + automaton: &'a A, +} + +impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> { + type Item = (usize, BlockAddr); + + fn next(&mut self) -> Option { + while let Some((new_key, block_id)) = self.streamer.next() { + if let Some(prev_key) = self.prev_key.as_mut() { + if block_match_automaton(Some(prev_key), new_key, self.automaton) { + prev_key.clear(); + prev_key.extend_from_slice(new_key); + return Some(( + block_id as usize, + self.block_addr_store.get(block_id).unwrap(), + )); + } + // actually we could not write here, and it would still be correct, but it might + // lead to checking more keys than necessary which in itself can be a slowdown. + prev_key.clear(); + prev_key.extend_from_slice(new_key); + } else { + self.prev_key = Some(new_key.to_owned()); + if block_match_automaton(None, new_key, self.automaton) { + return Some(( + block_id as usize, + self.block_addr_store.get(block_id).unwrap(), + )); + } + } + } + None + } } #[derive(Debug, Clone)] @@ -734,7 +825,8 @@ fn find_best_slope(elements: impl Iterator + Clone) -> (u32 mod tests { use common::OwnedBytes; - use super::{BlockAddr, SSTableIndexBuilder, SSTableIndexV3}; + use super::*; + use crate::block_match_automaton::tests::EqBuffer; use crate::SSTableDataCorruption; #[test] @@ -823,4 +915,108 @@ mod tests { (12345, 1) ); } + + #[test] + fn test_get_block_for_automaton() { + let sstable_index_builder = SSTableIndexBuilder { + blocks: vec![ + BlockMeta { + last_key_or_greater: vec![0, 1, 2], + block_addr: BlockAddr { + first_ordinal: 0, + byte_range: 0..10, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 2, 2], + block_addr: BlockAddr { + first_ordinal: 5, + byte_range: 10..20, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 3, 2], + block_addr: BlockAddr { + first_ordinal: 10, + byte_range: 20..30, + }, + }, + ], + }; + + let mut sstable_index_bytes = Vec::new(); + let fst_len = sstable_index_builder + .serialize(&mut sstable_index_bytes) + .unwrap(); + + let sstable = SSTableIndexV3::load(OwnedBytes::new(sstable_index_bytes), fst_len).unwrap(); + + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 1, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 2, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 1, + BlockAddr { + first_ordinal: 5, + byte_range: 10..20 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 3, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 4, 1])) + .collect::>(); + assert!(res.is_empty()); + + let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1])); + let res = sstable + .get_block_for_automaton(&complex_automaton) + .collect::>(); + assert_eq!( + res, + vec![ + ( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + ), + ( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + ) + ] + ); + } }