get iter for blocks of sstable matching automaton

This commit is contained in:
trinity-1686a
2024-07-13 18:39:15 +02:00
parent 3c30a41c14
commit 7e901f523b
3 changed files with 327 additions and 6 deletions

View File

@@ -150,13 +150,13 @@ fn match_range_end<S, A: Automaton<State = S>>(
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use proptest::prelude::*;
use tantivy_fst::Automaton;
use super::*;
struct EqBuffer(Vec<u8>);
pub(crate) struct EqBuffer(pub Vec<u8>);
impl Automaton for EqBuffer {
type State = Option<usize>;
@@ -185,7 +185,6 @@ mod tests {
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(1_000_000_000))]
#[test]
fn test_proptest_automaton_match_block(start in any::<Vec<u8>>(), end in any::<Vec<u8>>(), key in any::<Vec<u8>>()) {
// inverted keys are *not* supported and can return bogus results

View File

@@ -1,10 +1,12 @@
use common::OwnedBytes;
use tantivy_fst::Automaton;
use crate::block_match_automaton::block_match_automaton;
use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Clone)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
pub(crate) blocks: Vec<BlockMeta>,
}
impl SSTableIndex {
@@ -74,6 +76,27 @@ impl SSTableIndex {
// locate_with_ord always returns an index within range
self.get_block(self.locate_with_ord(ord)).unwrap()
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (usize, BlockAddr)> + 'a {
std::iter::once((None, &self.blocks[0]))
.chain(self.blocks.windows(2).map(|window| {
let [prev, curr] = window else {
unreachable!();
};
(Some(&*prev.last_key_or_greater), curr)
}))
.enumerate()
.filter_map(move |(pos, (prev_key, current_block))| {
if block_match_automaton(prev_key, &current_block.last_key_or_greater, automaton) {
Some((pos, current_block.block_addr.clone()))
} else {
None
}
})
}
}
#[derive(Debug, Clone)]
@@ -99,3 +122,106 @@ impl SSTable for IndexSSTable {
type ValueWriter = crate::value::index::IndexValueWriter;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::block_match_automaton::tests::EqBuffer;
#[test]
fn test_get_block_for_automaton() {
let sstable = SSTableIndex {
blocks: vec![
BlockMeta {
last_key_or_greater: vec![0, 1, 2],
block_addr: BlockAddr {
first_ordinal: 0,
byte_range: 0..10,
},
},
BlockMeta {
last_key_or_greater: vec![0, 2, 2],
block_addr: BlockAddr {
first_ordinal: 5,
byte_range: 10..20,
},
},
BlockMeta {
last_key_or_greater: vec![0, 3, 2],
block_addr: BlockAddr {
first_ordinal: 10,
byte_range: 20..30,
},
},
],
};
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 1, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
0,
BlockAddr {
first_ordinal: 0,
byte_range: 0..10
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 2, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
1,
BlockAddr {
first_ordinal: 5,
byte_range: 10..20
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 3, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
2,
BlockAddr {
first_ordinal: 10,
byte_range: 20..30
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 4, 1]))
.collect::<Vec<_>>();
assert!(res.is_empty());
let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1]));
let res = sstable
.get_block_for_automaton(&complex_automaton)
.collect::<Vec<_>>();
assert_eq!(
res,
vec![
(
0,
BlockAddr {
first_ordinal: 0,
byte_range: 0..10
}
),
(
2,
BlockAddr {
first_ordinal: 10,
byte_range: 20..30
}
)
]
);
}
}

View File

@@ -5,8 +5,9 @@ use std::sync::Arc;
use common::{BinarySerializable, FixedSize, OwnedBytes};
use tantivy_bitpacker::{compute_num_bits, BitPacker};
use tantivy_fst::raw::Fst;
use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer};
use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer};
use crate::block_match_automaton::block_match_automaton;
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
#[derive(Debug, Clone)]
@@ -64,6 +65,41 @@ impl SSTableIndex {
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
}
}
pub fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (usize, BlockAddr)> + 'a {
match self {
SSTableIndex::V2(v2_index) => {
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3(v3_index) => {
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3Empty(v3_empty) => {
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
}
}
}
}
enum BlockIter<V2, V3, T> {
V2(V2),
V3(V3),
V3Empty(std::iter::Once<T>),
}
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::V2(v2) => v2.next(),
BlockIter::V3(v3) => v3.next(),
BlockIter::V3Empty(once) => once.next(),
}
}
}
#[derive(Debug, Clone)]
@@ -123,6 +159,61 @@ impl SSTableIndexV3 {
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
self.block_addr_store.binary_search_ord(ord).1
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (usize, BlockAddr)> + 'a {
// this is more complicated than other index formats: we don't have a ready made list of
// blocks, and instead need to stream-decode the sstable.
GetBlockForAutomaton {
streamer: self.fst_index.stream(),
block_addr_store: &self.block_addr_store,
prev_key: None,
automaton,
}
}
}
struct GetBlockForAutomaton<'a, A: Automaton> {
streamer: tantivy_fst::map::Stream<'a>,
// TODO we could be more efficient by streaming the store
block_addr_store: &'a BlockAddrStore,
prev_key: Option<Vec<u8>>,
automaton: &'a A,
}
impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> {
type Item = (usize, BlockAddr);
fn next(&mut self) -> Option<Self::Item> {
while let Some((new_key, block_id)) = self.streamer.next() {
if let Some(prev_key) = self.prev_key.as_mut() {
if block_match_automaton(Some(prev_key), new_key, self.automaton) {
prev_key.clear();
prev_key.extend_from_slice(new_key);
return Some((
block_id as usize,
self.block_addr_store.get(block_id).unwrap(),
));
}
// actually we could not write here, and it would still be correct, but it might
// lead to checking more keys than necessary which in itself can be a slowdown.
prev_key.clear();
prev_key.extend_from_slice(new_key);
} else {
self.prev_key = Some(new_key.to_owned());
if block_match_automaton(None, new_key, self.automaton) {
return Some((
block_id as usize,
self.block_addr_store.get(block_id).unwrap(),
));
}
}
}
None
}
}
#[derive(Debug, Clone)]
@@ -734,7 +825,8 @@ fn find_best_slope(elements: impl Iterator<Item = (usize, u64)> + Clone) -> (u32
mod tests {
use common::OwnedBytes;
use super::{BlockAddr, SSTableIndexBuilder, SSTableIndexV3};
use super::*;
use crate::block_match_automaton::tests::EqBuffer;
use crate::SSTableDataCorruption;
#[test]
@@ -823,4 +915,108 @@ mod tests {
(12345, 1)
);
}
#[test]
fn test_get_block_for_automaton() {
let sstable_index_builder = SSTableIndexBuilder {
blocks: vec![
BlockMeta {
last_key_or_greater: vec![0, 1, 2],
block_addr: BlockAddr {
first_ordinal: 0,
byte_range: 0..10,
},
},
BlockMeta {
last_key_or_greater: vec![0, 2, 2],
block_addr: BlockAddr {
first_ordinal: 5,
byte_range: 10..20,
},
},
BlockMeta {
last_key_or_greater: vec![0, 3, 2],
block_addr: BlockAddr {
first_ordinal: 10,
byte_range: 20..30,
},
},
],
};
let mut sstable_index_bytes = Vec::new();
let fst_len = sstable_index_builder
.serialize(&mut sstable_index_bytes)
.unwrap();
let sstable = SSTableIndexV3::load(OwnedBytes::new(sstable_index_bytes), fst_len).unwrap();
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 1, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
0,
BlockAddr {
first_ordinal: 0,
byte_range: 0..10
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 2, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
1,
BlockAddr {
first_ordinal: 5,
byte_range: 10..20
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 3, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
2,
BlockAddr {
first_ordinal: 10,
byte_range: 20..30
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 4, 1]))
.collect::<Vec<_>>();
assert!(res.is_empty());
let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1]));
let res = sstable
.get_block_for_automaton(&complex_automaton)
.collect::<Vec<_>>();
assert_eq!(
res,
vec![
(
0,
BlockAddr {
first_ordinal: 0,
byte_range: 0..10
}
),
(
2,
BlockAddr {
first_ordinal: 10,
byte_range: 20..30
}
)
]
);
}
}