support iterating over partially loaded sstable

This commit is contained in:
trinity-1686a
2024-07-13 20:04:05 +02:00
parent 7e901f523b
commit 1f6a8e74bb
7 changed files with 134 additions and 51 deletions

View File

@@ -11,6 +11,7 @@ description = "sstables for tantivy"
[dependencies]
common = {version= "0.7", path="../common", package="tantivy-common"}
futures-util = "0.3.30"
tantivy-bitpacker = { version= "0.6", path="../bitpacker" }
tantivy-fst = "0.5"
# experimental gives us access to Decompressor::upper_bound

View File

@@ -7,6 +7,7 @@ use zstd::bulk::Decompressor;
pub struct BlockReader {
buffer: Vec<u8>,
reader: OwnedBytes,
next_readers: std::vec::IntoIter<OwnedBytes>,
offset: usize,
}
@@ -15,6 +16,18 @@ impl BlockReader {
BlockReader {
buffer: Vec::new(),
reader,
next_readers: Vec::new().into_iter(),
offset: 0,
}
}
pub fn from_multiple_blocks(readers: Vec<OwnedBytes>) -> BlockReader {
let mut next_readers = readers.into_iter();
let reader = next_readers.next().unwrap_or_else(|| OwnedBytes::empty());
BlockReader {
buffer: Vec::new(),
reader,
next_readers,
offset: 0,
}
}
@@ -34,42 +47,52 @@ impl BlockReader {
self.offset = 0;
self.buffer.clear();
let block_len = match self.reader.len() {
0 => return Ok(false),
1..=3 => {
loop {
let block_len = match self.reader.len() {
0 => {
// we are out of data for this block. Check if we have another block after
if let Some(new_reader) = self.next_readers.next() {
self.reader = new_reader;
continue;
} else {
return Ok(false);
}
}
1..=3 => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to read block_len",
))
}
_ => self.reader.read_u32() as usize,
};
if block_len <= 1 {
return Ok(false);
}
let compress = self.reader.read_u8();
let block_len = block_len - 1;
if self.reader.len() < block_len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to read block_len",
))
"failed to read block content",
));
}
_ => self.reader.read_u32() as usize,
};
if block_len <= 1 {
return Ok(false);
}
let compress = self.reader.read_u8();
let block_len = block_len - 1;
if compress == 1 {
let required_capacity =
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
self.buffer.reserve(required_capacity);
Decompressor::new()?
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
if self.reader.len() < block_len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to read block content",
));
}
if compress == 1 {
let required_capacity =
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
self.buffer.reserve(required_capacity);
Decompressor::new()?
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
self.reader.advance(block_len);
} else {
self.buffer.resize(block_len, 0u8);
self.reader.read_exact(&mut self.buffer[..])?;
}
self.reader.advance(block_len);
} else {
self.buffer.resize(block_len, 0u8);
self.reader.read_exact(&mut self.buffer[..])?;
return Ok(true);
}
Ok(true)
}
#[inline(always)]

View File

@@ -143,6 +143,16 @@ where TValueReader: value::ValueReader
}
}
pub fn from_multiple_blocks(reader: Vec<OwnedBytes>) -> Self {
DeltaReader {
idx: 0,
common_prefix_len: 0,
suffix_range: 0..0,
value_reader: TValueReader::default(),
block_reader: BlockReader::from_multiple_blocks(reader),
}
}
pub fn empty() -> Self {
DeltaReader::new(OwnedBytes::empty())
}

View File

@@ -7,6 +7,7 @@ use std::sync::Arc;
use common::bounds::{transform_bound_inner_res, TransformBound};
use common::file_slice::FileSlice;
use common::{BinarySerializable, OwnedBytes};
use futures_util::{stream, StreamExt, TryStreamExt};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;
@@ -98,20 +99,46 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
&self,
key_range: impl RangeBounds<[u8]>,
limit: Option<u64>,
automaton: &impl Automaton,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes_async().await?;
Ok(TSSTable::delta_reader(data))
let match_all = automaton.will_always_match(&automaton.start());
if match_all {
let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes_async().await?;
Ok(TSSTable::delta_reader(data))
} else {
let blocks =
stream::iter(self.get_block_iterator_for_range_and_automaton(key_range, automaton));
let data = blocks
.map(|block_addr| {
self.sstable_slice
.read_bytes_slice_async(block_addr.byte_range)
})
.buffered(5)
.try_collect::<Vec<_>>()
.await?;
Ok(DeltaReader::from_multiple_blocks(data))
}
}
pub(crate) fn sstable_delta_reader_for_key_range(
&self,
key_range: impl RangeBounds<[u8]>,
limit: Option<u64>,
automaton: &impl Automaton,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes()?;
Ok(TSSTable::delta_reader(data))
let match_all = automaton.will_always_match(&automaton.start());
if match_all {
let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes()?;
Ok(TSSTable::delta_reader(data))
} else {
let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton);
let data = blocks
.map(|block_addr| self.sstable_slice.read_bytes_slice(block_addr.byte_range))
.collect::<Result<Vec<_>, _>>()?;
Ok(DeltaReader::from_multiple_blocks(data))
}
}
pub(crate) fn sstable_delta_reader_block(
@@ -204,6 +231,31 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
self.sstable_slice.slice((start_bound, end_bound))
}
fn get_block_iterator_for_range_and_automaton<'a>(
&'a self,
key_range: impl RangeBounds<[u8]>,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = BlockAddr> + 'a {
let lower_bound = match key_range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => {
self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX)
}
Bound::Unbounded => 0,
};
let upper_bound = match key_range.end_bound() {
Bound::Included(key) | Bound::Excluded(key) => {
self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX)
}
Bound::Unbounded => u64::MAX,
};
let block_range = lower_bound..=upper_bound;
self.sstable_index
.get_block_for_automaton(automaton)
.filter(move |(block_id, _)| block_range.contains(block_id))
.map(|(_, block_addr)| block_addr)
}
/// Opens a `TermDictionary`.
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);

View File

@@ -80,7 +80,7 @@ impl SSTableIndex {
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (usize, BlockAddr)> + 'a {
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
std::iter::once((None, &self.blocks[0]))
.chain(self.blocks.windows(2).map(|window| {
let [prev, curr] = window else {
@@ -91,7 +91,7 @@ impl SSTableIndex {
.enumerate()
.filter_map(move |(pos, (prev_key, current_block))| {
if block_match_automaton(prev_key, &current_block.last_key_or_greater, automaton) {
Some((pos, current_block.block_addr.clone()))
Some((pos as u64, current_block.block_addr.clone()))
} else {
None
}

View File

@@ -69,7 +69,7 @@ impl SSTableIndex {
pub fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (usize, BlockAddr)> + 'a {
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
match self {
SSTableIndex::V2(v2_index) => {
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
@@ -163,7 +163,7 @@ impl SSTableIndexV3 {
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (usize, BlockAddr)> + 'a {
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
// this is more complicated than other index formats: we don't have a ready made list of
// blocks, and instead need to stream-decode the sstable.
@@ -185,7 +185,7 @@ struct GetBlockForAutomaton<'a, A: Automaton> {
}
impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> {
type Item = (usize, BlockAddr);
type Item = (u64, BlockAddr);
fn next(&mut self) -> Option<Self::Item> {
while let Some((new_key, block_id)) = self.streamer.next() {
@@ -193,10 +193,7 @@ impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> {
if block_match_automaton(Some(prev_key), new_key, self.automaton) {
prev_key.clear();
prev_key.extend_from_slice(new_key);
return Some((
block_id as usize,
self.block_addr_store.get(block_id).unwrap(),
));
return Some((block_id, self.block_addr_store.get(block_id).unwrap()));
}
// actually we could not write here, and it would still be correct, but it might
// lead to checking more keys than necessary which in itself can be a slowdown.
@@ -205,10 +202,7 @@ impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> {
} else {
self.prev_key = Some(new_key.to_owned());
if block_match_automaton(None, new_key, self.automaton) {
return Some((
block_id as usize,
self.block_addr_store.get(block_id).unwrap(),
));
return Some((block_id, self.block_addr_store.get(block_id).unwrap()));
}
}
}

View File

@@ -86,7 +86,7 @@ where
bound_as_byte_slice(&self.upper),
);
self.term_dict
.sstable_delta_reader_for_key_range(key_range, self.limit)
.sstable_delta_reader_for_key_range(key_range, self.limit, &self.automaton)
}
async fn delta_reader_async(&self) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
@@ -95,7 +95,7 @@ where
bound_as_byte_slice(&self.upper),
);
self.term_dict
.sstable_delta_reader_for_key_range_async(key_range, self.limit)
.sstable_delta_reader_for_key_range_async(key_range, self.limit, &self.automaton)
.await
}
@@ -327,4 +327,7 @@ mod tests {
assert!(!term_streamer.advance());
Ok(())
}
// TODO add test for sparse search with a block of poison (starts with 0xffffffff) => such a
// block instantly causes an unexpected EOF error
}